1. # A HyperWorkBuffer represents a chunk of work to be processed as part of a
  2. # parallelized operation (either thanks to hyper or race). It carries a
  3. # sequence number, and input buffer (items to process), and an output buffer
  4. # (results of processing them).
  5. my class HyperWorkBuffer {
  6. has int $.sequence-number is rw;
  7. has $.input;
  8. has $.output;
  9. method new() {
  10. my \wb = nqp::create(self);
  11. nqp::bindattr(wb, HyperWorkBuffer, '$!input', nqp::create(IterationBuffer));
  12. nqp::bindattr(wb, HyperWorkBuffer, '$!output', nqp::create(IterationBuffer));
  13. wb
  14. }
  15. # Clears both buffers.
  16. method clear(--> Nil) {
  17. nqp::setelems($!input, 0);
  18. nqp::setelems($!output, 0);
  19. }
  20. # Swaps around the input/output buffers, and clears the output buffer.
  21. # (This is used between pipelined stages, where the next stage will
  22. # use the items in the first.)
  23. method swap(--> Nil) {
  24. my $new-input := $!output;
  25. $!output := $!input;
  26. $!input := $new-input;
  27. nqp::setelems($!output, 0);
  28. }
  29. # Gets an iterator of the input.
  30. method input-iterator() {
  31. class :: does Iterator {
  32. has $!buffer;
  33. has int $!i;
  34. method new(\buffer) {
  35. nqp::p6bindattrinvres(
  36. nqp::create(self),self,'$!buffer',buffer
  37. )
  38. }
  39. method pull-one() {
  40. my int $i = $!i;
  41. if $i < nqp::elems($!buffer) {
  42. $!i = $i + 1;
  43. nqp::atpos($!buffer, $i)
  44. }
  45. else {
  46. IterationEnd
  47. }
  48. }
  49. }.new($!input)
  50. }
  51. }