1. # Waits for a promise to be kept or a channel to be able to receive a value
  2. # and, once it can, unwraps or returns the result. This should be made more
  3. # efficient by using continuations to suspend any task running in the thread
  4. # pool that blocks; for now, this cheat gets the basic idea in place.
  5. proto sub await(|) { * }
  6. multi sub await() {
  7. die "Must specify a Promise or Channel to await on (got an empty list)";
  8. }
  9. multi sub await(Any:U $x) {
  10. die "Must specify a defined Promise, Channel, or Supply to await on (got an undefined $x.^name())";
  11. }
  12. multi sub await(Any:D $x) {
  13. die "Must specify a Promise, Channel, or Supply to await on (got a $x.^name())";
  14. }
  15. multi sub await(Iterable:D $i) { $i.eager.map({ await $_ }) }
  16. multi sub await(Promise:D $p) { $p.result }
  17. multi sub await(Channel:D $c) { $c.receive }
  18. multi sub await(Supply:D $s) { $s.wait }
  19. multi sub await(*@awaitables) { @awaitables.eager.map({await $_}) }
  20. sub awaiterator(@promises) {
  21. Seq.new(class :: does Iterator {
  22. has @!todo;
  23. has @!done;
  24. method !SET-SELF(\todo) { @!todo = todo; self }
  25. method new(\todo) { nqp::create(self)!SET-SELF(todo) }
  26. method pull-one() is raw {
  27. if @!done {
  28. @!done.shift
  29. }
  30. elsif @!todo {
  31. Promise.anyof(@!todo).result;
  32. my @next;
  33. .status == Planned
  34. ?? @next.push($_)
  35. !! @!done.push($_.result)
  36. for @!todo;
  37. @!todo := @next;
  38. @!done.shift
  39. }
  40. else {
  41. IterationEnd
  42. }
  43. }
  44. method sink-all(--> IterationEnd) { Promise.allof(@promises).result }
  45. }.new(@promises))
  46. }
  47. sub cas (\val,&code) { val = code(val) } # naive implementation of cas