1. # A promise is a synchronization mechanism for a piece of work that will
  2. # produce a single result (keeping the promise) or fail (breaking the
  3. # promise).
  4. my enum PromiseStatus (:Planned(0), :Kept(1), :Broken(2));
  5. my class X::Promise::Combinator is Exception {
  6. has $.combinator;
  7. method message() { "Can only use $!combinator to combine defined Promise objects" }
  8. }
  9. my class X::Promise::CauseOnlyValidOnBroken is Exception {
  10. has $.promise;
  11. has $.status;
  12. method message() { "Can only call cause on a broken promise (status: $.status)" }
  13. }
  14. my class X::Promise::Vowed is Exception {
  15. has $.promise;
  16. method message() { "Access denied to keep/break this Promise; already vowed" }
  17. }
  18. my role X::Promise::Broken {
  19. has $.result-backtrace;
  20. multi method gist(::?CLASS:D:) {
  21. "Tried to get the result of a broken Promise\n" ~
  22. ((try $!result-backtrace ~ "\n") // '') ~
  23. "Original exception:\n" ~
  24. callsame().indent(4)
  25. }
  26. }
  27. my class Promise does Awaitable {
  28. has $.scheduler;
  29. has $.status;
  30. has $!result is default(Nil);
  31. has int $!vow_taken;
  32. has $!lock;
  33. has $!cond;
  34. has @!thens;
  35. has Mu $!dynamic_context;
  36. submethod BUILD(:$!scheduler = $*SCHEDULER --> Nil) {
  37. $!lock := nqp::create(Lock);
  38. $!cond := $!lock.condition();
  39. $!status = Planned;
  40. }
  41. # A Vow is used to enable the right to keep/break a promise
  42. # to be restricted to a given "owner". Taking the Vow for a Promise
  43. # prevents anybody else from getting hold of it.
  44. my class Vow { ... }
  45. trusts Vow;
  46. my class Vow {
  47. has $.promise;
  48. method keep(Mu \result) {
  49. $!promise!Promise::keep(result)
  50. }
  51. method break(\exception) {
  52. $!promise!Promise::break(exception)
  53. }
  54. }
  55. method vow() {
  56. nqp::lock($!lock);
  57. if $!vow_taken {
  58. nqp::unlock($!lock);
  59. X::Promise::Vowed.new(promise => self).throw
  60. }
  61. my $vow := nqp::create(Vow);
  62. nqp::bindattr($vow, Vow, '$!promise', self);
  63. $!vow_taken = 1;
  64. nqp::unlock($!lock);
  65. $vow
  66. }
  67. proto method keep(|) { * }
  68. multi method keep(Promise:D:) {
  69. self.vow.keep(True)
  70. }
  71. multi method keep(Promise:D: Mu \result) {
  72. self.vow.keep(result)
  73. }
  74. method !keep(Mu \result --> Nil) {
  75. $!lock.protect({
  76. $!result := result;
  77. $!status = Kept;
  78. self!schedule_thens();
  79. $!cond.signal_all;
  80. });
  81. }
  82. proto method break(|) { * }
  83. multi method break(Promise:D:) {
  84. self.vow.break(False)
  85. }
  86. multi method break(Promise:D: \result) {
  87. self.vow.break(result)
  88. }
  89. method !break(\result --> Nil) {
  90. $!lock.protect({
  91. $!result = nqp::istype(result, Exception)
  92. ?? result
  93. !! X::AdHoc.new(payload => result);
  94. $!status = Broken;
  95. self!schedule_thens();
  96. $!cond.signal_all;
  97. });
  98. }
  99. method !schedule_thens(--> Nil) {
  100. while @!thens {
  101. $!scheduler.cue(@!thens.shift, :catch(@!thens.shift))
  102. }
  103. }
  104. method result(Promise:D:) {
  105. # One important missing optimization here is that if the promise is
  106. # not yet started, then the work can be done immediately by the
  107. # thing that is blocking on it. (Note the while loop is there to cope
  108. # with spurious wake-ups).
  109. while $!status == Planned {
  110. $!lock.protect({
  111. # Re-check planned to avoid data race.
  112. $!cond.wait() if $!status == Planned;
  113. });
  114. }
  115. if $!status == Kept {
  116. $!result
  117. }
  118. elsif $!status == Broken {
  119. ($!result but X::Promise::Broken(Backtrace.new)).rethrow
  120. }
  121. }
  122. multi method Bool(Promise:D:) {
  123. so $!status == Broken || $!status == Kept
  124. }
  125. method cause(Promise:D:) {
  126. my $status = $!status;
  127. if $status == Broken {
  128. $!result
  129. } else {
  130. X::Promise::CauseOnlyValidOnBroken.new(
  131. promise => self,
  132. status => $status,
  133. ).throw
  134. }
  135. }
  136. method then(Promise:D: &code) {
  137. nqp::lock($!lock);
  138. if $!status == Broken || $!status == Kept {
  139. # Already have the result, start immediately.
  140. nqp::unlock($!lock);
  141. Promise.start( { code(self) }, :$!scheduler);
  142. }
  143. else {
  144. # Create a Promise, and push 2 entries to @!thens: something that
  145. # starts the then code, and something that handles its exceptions.
  146. # They will be sent to the scheduler when this promise is kept or
  147. # broken.
  148. my $then_promise = Promise.new(:$!scheduler);
  149. my $vow = $then_promise.vow;
  150. @!thens.push({ $vow.keep(code(self)) });
  151. @!thens.push(-> $ex { $vow.break($ex) });
  152. nqp::unlock($!lock);
  153. $then_promise
  154. }
  155. }
  156. my class PromiseAwaitableHandle does Awaitable::Handle {
  157. has &!add-subscriber;
  158. method not-ready(&add-subscriber) {
  159. nqp::create(self)!not-ready(&add-subscriber)
  160. }
  161. method !not-ready(&add-subscriber) {
  162. $!already = False;
  163. &!add-subscriber := &add-subscriber;
  164. self
  165. }
  166. method subscribe-awaiter(&subscriber --> Nil) {
  167. &!add-subscriber(&subscriber);
  168. }
  169. }
  170. method get-await-handle(--> Awaitable::Handle:D) {
  171. if $!status == Broken {
  172. PromiseAwaitableHandle.already-failure($!result)
  173. }
  174. elsif $!status == Kept {
  175. PromiseAwaitableHandle.already-success($!result)
  176. }
  177. else {
  178. PromiseAwaitableHandle.not-ready: -> &on-ready {
  179. nqp::lock($!lock);
  180. if $!status == Broken || $!status == Kept {
  181. # Already have the result, call on-ready immediately.
  182. nqp::unlock($!lock);
  183. on-ready($!status == Kept, $!result)
  184. }
  185. else {
  186. # Push 2 entries to @!thens (only need the first one in
  187. # this case; second we push 'cus .then uses it).
  188. @!thens.push({ on-ready($!status == Kept, $!result) });
  189. @!thens.push(Callable);
  190. nqp::unlock($!lock);
  191. }
  192. }
  193. }
  194. }
  195. method start(Promise:U: &code, :&catch, :$scheduler = $*SCHEDULER, |c) {
  196. my $p := Promise.new(:$scheduler);
  197. nqp::bindattr($p, Promise, '$!dynamic_context', nqp::ctx());
  198. my $vow = $p.vow;
  199. $scheduler.cue(
  200. { my $*PROMISE := $p; $vow.keep(code(|c)) },
  201. :catch(-> $ex { catch($ex) if &catch; $vow.break($ex); }) );
  202. $p
  203. }
  204. method in(Promise:U: $seconds, :$scheduler = $*SCHEDULER) {
  205. my $p = Promise.new(:$scheduler);
  206. my $vow = $p.vow;
  207. $scheduler.cue({ $vow.keep(True) }, :in($seconds));
  208. $p
  209. }
  210. method at(Promise:U: $at, :$scheduler = $*SCHEDULER) {
  211. self.in( $at - now, :$scheduler )
  212. }
  213. method anyof(Promise:U: *@p) { self!until_n_kept(@p, 1, 'anyof') }
  214. method allof(Promise:U: *@p) { self!until_n_kept(@p, +@p, 'allof') }
  215. method !until_n_kept(@promises, Int $N, Str $combinator) {
  216. my $p = Promise.new;
  217. unless @promises {
  218. $p.keep;
  219. return $p
  220. }
  221. X::Promise::Combinator.new(:$combinator).throw
  222. unless Rakudo::Internals.ALL_DEFINED_TYPE(@promises, Promise);
  223. my int $n = $N;
  224. my int $c = $n;
  225. my $lock := nqp::create(Lock);
  226. my $vow = $p.vow;
  227. for @promises -> $cand {
  228. $cand.then({
  229. if $lock.protect({ $c = $c - 1 }) == 0 {
  230. $vow.keep(True)
  231. }
  232. })
  233. }
  234. $p
  235. }
  236. method Supply(Promise:D:) {
  237. Supply.on-demand: -> $s {
  238. self.then({
  239. if self.status == Kept {
  240. $s.emit(self.result);
  241. $s.done();
  242. }
  243. else {
  244. $s.quit(self.cause);
  245. }
  246. });
  247. }
  248. }
  249. }
  250. multi sub infix:<eqv>(Promise:D \a, Promise:D \b) {
  251. nqp::p6bool(
  252. nqp::eqaddr(a,b) || a.result eqv b.result
  253. )
  254. }