1. # A channel provides a thread-safe way to send a series of values from some
  2. # producer(s) to some consumer(s).
  3. my class X::Channel::SendOnClosed is Exception {
  4. has $.channel;
  5. method message() { "Cannot send a message on a closed channel" }
  6. }
  7. my class X::Channel::ReceiveOnClosed is Exception {
  8. has $.channel;
  9. method message() { "Cannot receive a message on a closed channel" }
  10. }
  11. my class Channel does Awaitable {
  12. # The queue of events moving through the channel.
  13. my class Queue is repr('ConcBlockingQueue') { }
  14. has $!queue;
  15. # Promise that is triggered when all values are received, or an error is
  16. # received and the channel is thus closed.
  17. has $!closed_promise;
  18. # Closed promise's vow.
  19. has $!closed_promise_vow;
  20. # Flag for if the channel is closed to senders.
  21. has $!closed;
  22. # We use a Supplier to send async notifications that there may be a new
  23. # message to read from the channel (there may be many things competing
  24. # over them).
  25. has $!async-notify;
  26. # Magical objects for various ways a channel can end.
  27. my class CHANNEL_CLOSE { }
  28. my class CHANNEL_FAIL { has $.error }
  29. submethod BUILD(--> Nil) {
  30. $!queue := nqp::create(Queue);
  31. $!closed_promise = Promise.new;
  32. $!closed_promise_vow = $!closed_promise.vow;
  33. $!async-notify = Supplier.new;
  34. }
  35. method send(Channel:D: \item) {
  36. X::Channel::SendOnClosed.new(channel => self).throw if $!closed;
  37. nqp::push($!queue, nqp::decont(item));
  38. $!async-notify.emit(True);
  39. Nil
  40. }
  41. method !receive(Channel:D: $fail-on-close) {
  42. my \msg := nqp::shift($!queue);
  43. if nqp::istype(msg, CHANNEL_CLOSE) {
  44. nqp::push($!queue, msg); # make sure other readers see it
  45. $!closed_promise_vow.keep(Nil);
  46. X::Channel::ReceiveOnClosed.new(channel => self).throw
  47. if $fail-on-close;
  48. Nil
  49. }
  50. elsif nqp::istype(msg, CHANNEL_FAIL) {
  51. nqp::push($!queue, msg); # make sure other readers see it
  52. $!closed_promise_vow.break(msg.error);
  53. die msg.error;
  54. }
  55. else {
  56. msg
  57. }
  58. }
  59. method receive(Channel:D:) { self!receive(1) }
  60. method receive-nil-on-close(Channel:D:) { self!receive(0) }
  61. method poll(Channel:D:) {
  62. my \msg := nqp::queuepoll($!queue);
  63. if nqp::isnull(msg) {
  64. Nil
  65. } else {
  66. if nqp::istype(msg, CHANNEL_CLOSE) {
  67. $!closed_promise_vow.keep(Nil);
  68. Nil
  69. }
  70. elsif nqp::istype(msg, CHANNEL_FAIL) {
  71. $!closed_promise_vow.break(msg.error);
  72. Nil
  73. }
  74. else {
  75. msg
  76. }
  77. }
  78. }
  79. method !peek(Channel:D:) {
  80. my \msg := nqp::atpos($!queue, 0);
  81. if nqp::isnull(msg) {
  82. Nil
  83. } else {
  84. if nqp::istype(msg, CHANNEL_CLOSE) {
  85. $!closed_promise_vow.keep(Nil);
  86. Nil
  87. }
  88. elsif nqp::istype(msg, CHANNEL_FAIL) {
  89. $!closed_promise_vow.break(msg.error);
  90. Nil
  91. }
  92. else {
  93. msg
  94. }
  95. }
  96. }
  97. method Supply(Channel:D:) {
  98. supply {
  99. # Tap the async notification for new values supply.
  100. whenever $!async-notify.unsanitized-supply.schedule-on($*SCHEDULER) {
  101. my Mu \got = self.poll;
  102. if nqp::eqaddr(got, Nil) {
  103. if $!closed_promise {
  104. $!closed_promise.status == Kept
  105. ?? done()
  106. !! die $!closed_promise.cause
  107. }
  108. }
  109. else {
  110. emit got;
  111. }
  112. }
  113. # Grab anything that's in the channel and emit it. Note that
  114. # it's important to do this after tapping the supply, or a
  115. # value sent between us draining it and doing the tap would
  116. # not result in a notification, and so we'd not emit it on
  117. # the supply. This lost event can then cause a deadlock.
  118. loop {
  119. my Mu \got = self.poll;
  120. last if nqp::eqaddr(got, Nil);
  121. emit got;
  122. }
  123. self!peek();
  124. if $!closed_promise {
  125. $!closed_promise.status == Kept
  126. ?? done()
  127. !! die $!closed_promise.cause
  128. }
  129. }
  130. }
  131. method iterator(Channel:D:) {
  132. class :: does Iterator {
  133. has $!channel;
  134. method !SET-SELF($!channel) { self }
  135. method new(\c) { nqp::create(self)!SET-SELF(c) }
  136. method pull-one() {
  137. my Mu \got = $!channel.receive-nil-on-close;
  138. nqp::eqaddr(got, Nil) ?? IterationEnd !! got
  139. }
  140. }.new(self)
  141. }
  142. method Seq(Channel:D:) { Seq.new(self.iterator) }
  143. method list(Channel:D:) { self.Seq.list }
  144. my class ChannelAwaitableHandle does Awaitable::Handle {
  145. has $!channel;
  146. has $!closed_promise;
  147. has $!async-notify;
  148. method not-ready(Channel:D $channel, Promise:D $closed_promise, Supplier:D $async-notify) {
  149. nqp::create(self)!not-ready($channel, $closed_promise, $async-notify)
  150. }
  151. method !not-ready($channel, $closed_promise, $async-notify) {
  152. $!already = False;
  153. $!channel := $channel;
  154. $!closed_promise := $closed_promise;
  155. $!async-notify := $async-notify;
  156. self
  157. }
  158. method subscribe-awaiter(&subscriber --> Nil) {
  159. # Need some care here to avoid a race. We must tap the notification
  160. # supply first, and then do an immediate poll after it, just to be
  161. # sure we won't miss notifications between the two. Also, we need
  162. # to take some care that we never call subscriber twice; a lock is
  163. # a tad heavy-weight for it, in the future we can just CAS an int.
  164. my $notified := False;
  165. my $l := Lock.new;
  166. my $t := $!async-notify.unsanitized-supply.tap: &poll-now;
  167. poll-now();
  168. sub poll-now($discard?) {
  169. $l.protect: {
  170. unless $notified {
  171. my \maybe = $!channel.poll;
  172. if maybe === Nil {
  173. if $!closed_promise.status == Kept {
  174. $notified := True;
  175. subscriber(False, X::Channel::ReceiveOnClosed.new(:$!channel))
  176. }
  177. elsif $!closed_promise.status == Broken {
  178. $notified := True;
  179. subscriber(False, $!closed_promise.cause)
  180. }
  181. }
  182. else {
  183. $notified := True;
  184. subscriber(True, maybe);
  185. }
  186. $t.close if $notified;
  187. }
  188. }
  189. }
  190. }
  191. }
  192. method get-await-handle(--> Awaitable::Handle:D) {
  193. my \maybe = self.poll;
  194. if maybe === Nil {
  195. if $!closed_promise {
  196. ChannelAwaitableHandle.already-failure(
  197. $!closed_promise.status == Kept
  198. ?? X::Channel::ReceiveOnClosed.new(channel => self)
  199. !! $!closed_promise.cause
  200. )
  201. }
  202. else {
  203. ChannelAwaitableHandle.not-ready(self, $!closed_promise, $!async-notify)
  204. }
  205. }
  206. else {
  207. ChannelAwaitableHandle.already-success(maybe)
  208. }
  209. }
  210. method close() {
  211. $!closed = 1;
  212. nqp::push($!queue, CHANNEL_CLOSE);
  213. # if $!queue is otherwise empty, make sure that $!closed_promise
  214. # learns about the new value
  215. self!peek();
  216. $!async-notify.emit(True);
  217. Nil
  218. }
  219. method elems() {
  220. Failure.new("Cannot determine number of elements on a {self.^name}")
  221. }
  222. method fail($error is copy) {
  223. $!closed = 1;
  224. $error = X::AdHoc.new(payload => $error) unless nqp::istype($error, Exception);
  225. nqp::push($!queue, CHANNEL_FAIL.new(:$error));
  226. $!async-notify.emit(True);
  227. Nil
  228. }
  229. method closed() {
  230. self!peek();
  231. $!closed_promise
  232. }
  233. }