1. my role Awaiter {
  2. method await(Awaitable:D $a) { ... }
  3. method await-all(Iterable:D $i) { ... }
  4. }
  5. my class Awaiter::Blocking does Awaiter {
  6. method await(Awaitable:D $a) {
  7. my $handle := $a.get-await-handle;
  8. if $handle.already {
  9. $handle.success
  10. ?? $handle.result
  11. !! $handle.cause.rethrow
  12. }
  13. else {
  14. my $s = Semaphore.new(0);
  15. my $success;
  16. my $result;
  17. $handle.subscribe-awaiter(-> \success, \result {
  18. $success := success;
  19. $result := result;
  20. $s.release;
  21. });
  22. $s.acquire;
  23. $success
  24. ?? $result
  25. !! $result.rethrow
  26. }
  27. }
  28. method await-all(Iterable:D \i) {
  29. # Collect results that are already available, and handles where the
  30. # results are not yet available together with the matching insertion
  31. # indices.
  32. my \results = nqp::list();
  33. my \handles = nqp::list();
  34. my \indices = nqp::list_i();
  35. my int $insert = 0;
  36. for i -> $awaitable {
  37. unless nqp::istype($awaitable, Awaitable) {
  38. die "Can only specify Awaitable objects to await (got a $awaitable.^name())";
  39. }
  40. unless nqp::isconcrete($awaitable) {
  41. die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())";
  42. }
  43. my $handle := $awaitable.get-await-handle;
  44. if $handle.already {
  45. $handle.success
  46. ?? nqp::bindpos(results, $insert, $handle.result)
  47. !! $handle.cause.rethrow
  48. }
  49. else {
  50. nqp::push(handles, $handle);
  51. nqp::push_i(indices, $insert);
  52. }
  53. $insert++;
  54. }
  55. # See if we have anything that we need to really block on. If so, we
  56. # use a lock and condition variable to handle the blocking. The lock
  57. # protects writes into the array.
  58. my int $num-handles = nqp::elems(handles);
  59. if $num-handles {
  60. my $exception = Mu;
  61. my $l = Lock.new;
  62. my $ready = $l.condition();
  63. my int $remaining = $num-handles;
  64. loop (my int $i = 0; $i < $num-handles; $i++) {
  65. my $handle := nqp::atpos(handles, $i);
  66. my int $insert = nqp::atpos_i(indices, $i);
  67. $handle.subscribe-awaiter(-> \success, \result {
  68. $l.protect: {
  69. if success && $remaining {
  70. nqp::bindpos(results, $insert, result);
  71. --$remaining;
  72. $ready.signal unless $remaining;
  73. }
  74. elsif !nqp::isconcrete($exception) {
  75. $exception := result;
  76. $remaining = 0;
  77. $ready.signal;
  78. }
  79. }
  80. });
  81. }
  82. # Block until remaining is 0 (need the loop to cope with suprious
  83. # wakeups).
  84. loop {
  85. $l.protect: {
  86. last if $remaining == 0;
  87. $ready.wait;
  88. }
  89. }
  90. # If we got an exception, throw it.
  91. $exception.rethrow if nqp::isconcrete($exception);
  92. }
  93. nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results);
  94. }
  95. }
  96. PROCESS::<$AWAITER> := Awaiter::Blocking;