1. # A HyperSeq wraps up a HyperIterator. When asked for the hyper-iterator, it
  2. # simply returns it, then complains if you ask a second time - much like Seq
  3. # does for its iterator. If you ask for its iterator, then you are ending the
  4. # declaration of a chain of parallelizable operations. That is, in fact, the
  5. # thing that will actually kick off the parallel work.
  6. my class Promise { ... }
  7. my class HyperSeq does Iterable does HyperIterable does PositionalBindFailover {
  8. has HyperIterator $!hyper-iter;
  9. # The only valid way to create a HyperSeq directly is by giving it the
  10. # hyper-iterator it will expose and maybe memoize.
  11. method new(HyperIterator:D $hyper-iter) {
  12. nqp::p6bindattrinvres(
  13. nqp::create(self),HyperSeq,'$!hyper-iter',nqp::decont($hyper-iter)
  14. )
  15. }
  16. # Obtains the hyper-iterator (meaning we're being consumed as part of a
  17. # parallel processing pipeline).
  18. method hyper-iterator(HyperSeq:D:) {
  19. my \hyper-iter = $!hyper-iter;
  20. X::Seq::Consumed.new.throw unless hyper-iter.DEFINITE;
  21. $!hyper-iter := HyperIterator;
  22. hyper-iter
  23. }
  24. # Obtain the iterator, the consumption of which will kick off parallel
  25. # processing.
  26. method iterator(HyperSeq:D:) {
  27. class :: does Iterator {
  28. my constant NOT_STARTED = 0;
  29. my constant STARTED = 1;
  30. my constant ALL_ADDED = 2;
  31. # For concurrency control
  32. has $!lock;
  33. has $!cond-have-work;
  34. has $!cond-have-result;
  35. # State that must be protected by the above lock, used by all
  36. # threads involved.
  37. has $!work-available;
  38. has $!work-completed;
  39. has int $!in-progress;
  40. # State only touched by the thread controlling the iteration.
  41. has $!configuration;
  42. has $!hyper-iterator;
  43. has $!active-result-buffer;
  44. has $!status;
  45. has int $!sequence-number;
  46. has int $!next-result-sequence-number;
  47. method new(\hyper-iterator) {
  48. my \iter = nqp::create(self);
  49. my \lock = Lock.new;
  50. nqp::bindattr(iter, self, '$!hyper-iterator', hyper-iterator);
  51. nqp::bindattr(iter, self, '$!configuration', hyper-iterator.configuration);
  52. nqp::bindattr(iter, self, '$!work-available', nqp::create(IterationBuffer));
  53. nqp::bindattr(iter, self, '$!work-completed', nqp::create(IterationBuffer));
  54. nqp::bindattr(iter, self, '$!lock', lock);
  55. nqp::bindattr(iter, self, '$!cond-have-work', lock.condition);
  56. nqp::bindattr(iter, self, '$!cond-have-result', lock.condition);
  57. nqp::bindattr(iter, self, '$!status', NOT_STARTED);
  58. iter
  59. }
  60. method pull-one() {
  61. self!start() if $!status == NOT_STARTED;
  62. self!block-for-result() unless $!active-result-buffer.DEFINITE;
  63. if $!active-result-buffer.DEFINITE {
  64. my \result = nqp::shift($!active-result-buffer);
  65. $!active-result-buffer := Mu
  66. unless nqp::elems($!active-result-buffer);
  67. result
  68. }
  69. else {
  70. IterationEnd
  71. }
  72. }
  73. method !start(--> Nil) {
  74. # Mark that we've started the work (done here because this
  75. # may get upgraded to ALL_ADDED if there's not much work).
  76. $!status := STARTED;
  77. # Add batches and start workers. Provided there is enough
  78. # work to do, this should feed them all nicely.
  79. for ^$!configuration.degree {
  80. my \done = self!add-batch();
  81. self!start-worker();
  82. last if done =:= IterationEnd;
  83. }
  84. }
  85. method !add-batch() {
  86. my \work = HyperWorkBuffer.new;
  87. work.sequence-number = $!sequence-number++;
  88. # XXX error handling around below
  89. my \done = $!hyper-iterator.fill-buffer(work, $!configuration.batch);
  90. $!lock.protect({
  91. nqp::push($!work-available, work);
  92. if done =:= IterationEnd {
  93. $!status := ALL_ADDED;
  94. $!cond-have-work.signal_all();
  95. } else {
  96. $!cond-have-work.signal();
  97. }
  98. });
  99. done
  100. }
  101. method !start-worker() {
  102. start {
  103. loop {
  104. # Acquire work.
  105. my $my-work;
  106. $!lock.protect({
  107. until $my-work.DEFINITE {
  108. if nqp::elems($!work-available) {
  109. $my-work := nqp::shift($!work-available);
  110. $!in-progress++;
  111. }
  112. elsif $!status == ALL_ADDED {
  113. last;
  114. }
  115. else {
  116. $!cond-have-work.wait();
  117. }
  118. }
  119. });
  120. unless $my-work.DEFINITE {
  121. $!cond-have-result.signal();
  122. last;
  123. }
  124. # Do work.
  125. try {
  126. $!hyper-iterator.process-buffer($my-work);
  127. CATCH {
  128. default {
  129. # GLR XXX error handling
  130. nqp::say(.gist);
  131. }
  132. }
  133. }
  134. # Place in results and signal anyone waiting for it.
  135. $!lock.protect({
  136. nqp::push($!work-completed, $my-work);
  137. $!in-progress--;
  138. $!cond-have-result.signal();
  139. });
  140. }
  141. }
  142. }
  143. method !block-for-result(--> Nil) {
  144. my int $we-got-an-empty-buffer;
  145. my int $last-amount-of-completed = 0;
  146. repeat while $we-got-an-empty-buffer {
  147. my int $work-deficit = 0;
  148. $we-got-an-empty-buffer = 0;
  149. $!lock.protect({
  150. until nqp::elems($!work-completed) > $last-amount-of-completed || self!finished() {
  151. $!cond-have-result.wait();
  152. }
  153. my Mu $backlog := Mu;
  154. while nqp::elems($!work-completed) && !$we-got-an-empty-buffer {
  155. my $first-result := nqp::shift($!work-completed);
  156. if $!configuration.race || $first-result.sequence-number == $!next-result-sequence-number {
  157. $!active-result-buffer := $first-result.output;
  158. $!next-result-sequence-number++;
  159. } else {
  160. if $backlog =:= Mu {
  161. $backlog := nqp::list();
  162. }
  163. nqp::push($backlog, $first-result);
  164. }
  165. $work-deficit = $!configuration.degree - nqp::elems($!work-available);
  166. if $!active-result-buffer =:= Mu || $!active-result-buffer.elems == 0 {
  167. $!active-result-buffer := Mu;
  168. $we-got-an-empty-buffer = 1;
  169. } else {
  170. last;
  171. }
  172. }
  173. unless $backlog =:= Mu {
  174. while nqp::elems($backlog) {
  175. nqp::push($!work-completed, nqp::shift($backlog));
  176. }
  177. }
  178. $last-amount-of-completed = nqp::elems($!work-completed);
  179. });
  180. while $!status != ALL_ADDED && $work-deficit > 0 {
  181. last if self!add-batch() =:= IterationEnd;
  182. $work-deficit--;
  183. }
  184. }
  185. }
  186. method !finished() {
  187. $!status == ALL_ADDED &&
  188. nqp::elems($!work-available) == 0 &&
  189. $!in-progress == 0
  190. }
  191. }.new(self.hyper-iterator)
  192. }
  193. # Various operations use the sequential iterator since they wish to set
  194. # off the parallel processing and consume the results.
  195. method List(HyperSeq:D:) {
  196. List.from-iterator(self.iterator)
  197. }
  198. method Slip(HyperSeq:D:) {
  199. Slip.from-iterator(self.iterator)
  200. }
  201. method Array(HyperSeq:D:) {
  202. Array.from-iterator(self.iterator)
  203. }
  204. method sink(HyperSeq:D: --> Nil) {
  205. # Means we're doing parallel work for its side-effects. Doesn't need
  206. # any special handling, nor does it warrant a warning since this is
  207. # what 'hyper for @xs -> $x { }' will end up calling.
  208. self.iterator.sink-all;
  209. }
  210. # Not indexable.
  211. multi method AT-POS(HyperSeq:D: $) {
  212. X::Seq::NotIndexable.new.throw
  213. }
  214. multi method EXISTS-POS(HyperSeq:D: $) {
  215. X::Seq::NotIndexable.new.throw
  216. }
  217. multi method DELETE-POS(HyperSeq:D: $) {
  218. X::Seq::NotIndexable.new.throw
  219. }
  220. }