1. # When we tap a Supply, we get back a Tap object. We close the tap in order
  2. # to turn off the flow of values.
  3. my class Tap {
  4. has &!on-close;
  5. submethod BUILD(:&!on-close --> Nil) { }
  6. method new(&on-close) {
  7. self.bless(:&on-close)
  8. }
  9. method close() {
  10. if &!on-close {
  11. my \close-result = &!on-close();
  12. await close-result if nqp::istype(close-result, Promise);
  13. }
  14. True
  15. }
  16. }
  17. # The asynchronous dual of the Iterator role; goes inside of a Supply, which
  18. # is the asynchronous dual of the Seq class. So just as a Seq wraps around an
  19. # Iterator so we don't expose all the internal iterator types to the world, a
  20. # Supply wraps about a Tappable so we don't expose all of those. (It may
  21. # surprise you that it's a Tappable, not a Tap, given Seq wraps an Iterator,
  22. # not an Iterable. Guess that's part of the duality too. Ask your local
  23. # category theorist. :-))
  24. my role Tappable {
  25. method tap() { ... }
  26. method live() { ... } # Taps into a live data source
  27. method serial() { ... } # Promises no concurrent emits
  28. method sane() { ... } # Matches emit* [done|quit]? grammar
  29. }
  30. # A few Supply-related exception types.
  31. my class X::Supply::Combinator is Exception {
  32. has $.combinator;
  33. method message() { "Can only use $!combinator to combine defined Supply objects" }
  34. }
  35. my class X::Supply::Migrate::Needs is Exception {
  36. method message() {
  37. ".migrate needs Supplies to be emitted"
  38. }
  39. }
  40. my class X::Supply::New is Exception {
  41. method message() {
  42. "Cannot directly create a Supply. You might want:\n" ~
  43. " - To use a Supplier in order to get a live supply\n" ~
  44. " - To use Supply.on-demand to create an on-demand supply\n" ~
  45. " - To create a Supply using a supply block"
  46. }
  47. }
  48. # A Supply is like an asynchronous Seq. All the methods that you can do on
  49. # a Supply go in here.
  50. my class Supplier { ... }
  51. my class Supplier::Preserving { ... }
  52. my class Supply does Awaitable {
  53. has Tappable $!tappable;
  54. proto method new(|) { * }
  55. multi method new() {
  56. X::Supply::New.new.throw
  57. }
  58. multi method new(Tappable $tappable) {
  59. self.bless(:$tappable);
  60. }
  61. submethod BUILD(:$!tappable! --> Nil) { }
  62. method live(Supply:D:) { $!tappable.live }
  63. method serial(Supply:D:) { $!tappable.serial }
  64. my \DISCARD = -> $ {};
  65. my \NOP = -> {};
  66. my \DEATH = -> $ex { $ex.throw };
  67. method tap(Supply:D: &emit = DISCARD, :&done = NOP, :&quit = DEATH) {
  68. $!tappable.tap(&emit, &done, &quit)
  69. }
  70. method act(Supply:D: &actor, *%others) {
  71. self.sanitize.tap(&actor, |%others)
  72. }
  73. ##
  74. ## Supply factories
  75. ##
  76. method on-demand(Supply:U: &producer, :&closing, :$scheduler = CurrentThreadScheduler) {
  77. Supply.new(class :: does Tappable {
  78. has &!producer;
  79. has &!closing;
  80. has $!scheduler;
  81. submethod BUILD(:&!producer!, :&!closing!, :$!scheduler! --> Nil) {}
  82. method tap(&emit, &done, &quit) {
  83. my $p = Supplier.new;
  84. $p.Supply.tap(&emit, :&done, :&quit); # sanitizes
  85. $!scheduler.cue({ &!producer($p) },
  86. catch => -> \ex { $p.quit(ex) });
  87. Tap.new(&!closing)
  88. }
  89. method live(--> False) { }
  90. method sane(--> True) { }
  91. method serial(--> True) { }
  92. }.new(:&producer, :&closing, :$scheduler))
  93. }
  94. method from-list(Supply:U: +@values, :$scheduler = CurrentThreadScheduler) {
  95. self.on-demand(-> $p {
  96. $p.emit($_) for @values;
  97. $p.done();
  98. }, :$scheduler);
  99. }
  100. method interval(Supply:U: $interval, $delay = 0, :$scheduler = $*SCHEDULER) {
  101. Supply.new(class :: does Tappable {
  102. has $!scheduler;
  103. has $!interval;
  104. has $!delay;
  105. submethod BUILD(:$!scheduler, :$!interval, :$!delay --> Nil) { }
  106. method tap(&emit, |) {
  107. my $i = 0;
  108. my $lock = Lock.new;
  109. my $cancellation = $!scheduler.cue(
  110. {
  111. emit($lock.protect: { $i++ });
  112. CATCH { $cancellation.cancel if $cancellation }
  113. },
  114. :every($!interval), :in($!delay)
  115. );
  116. Tap.new({ $cancellation.cancel })
  117. }
  118. method live(--> False) { }
  119. method sane(--> True) { }
  120. method serial(--> False) { }
  121. }.new(:$interval, :$delay, :$scheduler));
  122. }
  123. ##
  124. ## Simple operations are those that operate on a single Supply, carry its
  125. ## liveness, and are always serial. We implement the directly as they are
  126. ## common and fairly "hot path".
  127. ##
  128. my role SimpleOpTappable does Tappable {
  129. has $!source;
  130. method live() { $!source.live }
  131. method sane(--> True) { }
  132. method serial(--> True) { }
  133. method !cleanup(int $cleaned-up is rw, $source-tap) {
  134. if $source-tap && !$cleaned-up {
  135. $cleaned-up = 1;
  136. $source-tap.close;
  137. }
  138. }
  139. }
  140. method serialize(Supply:D:) {
  141. $!tappable.serial ?? self !! Supply.new(class :: does SimpleOpTappable {
  142. has $!lock = Lock.new;
  143. submethod BUILD(:$!source! --> Nil) { }
  144. method tap(&emit, &done, &quit) {
  145. my int $cleaned-up = 0;
  146. my $source-tap = $!source.tap(
  147. -> \value{
  148. $!lock.protect: { emit(value); }
  149. },
  150. done => -> {
  151. $!lock.protect: {
  152. done();
  153. self!cleanup($cleaned-up, $source-tap);
  154. }
  155. },
  156. quit => -> $ex {
  157. $!lock.protect: {
  158. quit($ex);
  159. self!cleanup($cleaned-up, $source-tap);
  160. }
  161. });
  162. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  163. }
  164. }.new(source => self))
  165. }
  166. method sanitize() {
  167. $!tappable.sane ?? self !! Supply.new(class :: does SimpleOpTappable {
  168. has int $!finished;
  169. submethod BUILD(:$!source! --> Nil) { }
  170. method tap(&emit, &done, &quit) {
  171. my int $cleaned-up = 0;
  172. my $source-tap = $!source.tap(
  173. -> \value{
  174. emit(value) unless $!finished;
  175. },
  176. done => -> {
  177. unless $!finished {
  178. $!finished = 1;
  179. done();
  180. self!cleanup($cleaned-up, $source-tap);
  181. }
  182. },
  183. quit => -> $ex {
  184. unless $!finished {
  185. $!finished = 1;
  186. quit($ex);
  187. self!cleanup($cleaned-up, $source-tap);
  188. }
  189. });
  190. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  191. }
  192. }.new(source => self.serialize))
  193. }
  194. method on-close(Supply:D: &on-close) {
  195. return Supply.new(class :: does SimpleOpTappable {
  196. has int $!finished;
  197. has &!on-close;
  198. submethod BUILD(:$!source!, :&!on-close! --> Nil) { }
  199. method tap(&emit, &done, &quit) {
  200. my int $cleaned-up = 0;
  201. my $source-tap = $!source.tap(&emit, :&done, :&quit);
  202. Tap.new({
  203. &!on-close();
  204. self!cleanup($cleaned-up, $source-tap)
  205. })
  206. }
  207. }.new(source => self, :&on-close))
  208. }
  209. method map(Supply:D: &mapper) {
  210. Supply.new(class :: does SimpleOpTappable {
  211. has &!mapper;
  212. submethod BUILD(:$!source!, :&!mapper! --> Nil) { }
  213. method tap(&emit, &done, &quit) {
  214. my int $cleaned-up = 0;
  215. my $source-tap = $!source.tap(
  216. -> \value {
  217. my \result = try &!mapper(value);
  218. if $! {
  219. quit($!);
  220. self!cleanup($cleaned-up, $source-tap);
  221. }
  222. else {
  223. emit(result)
  224. }
  225. },
  226. done => -> {
  227. done();
  228. self!cleanup($cleaned-up, $source-tap);
  229. },
  230. quit => -> $ex {
  231. quit($ex);
  232. self!cleanup($cleaned-up, $source-tap);
  233. });
  234. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  235. }
  236. }.new(source => self.sanitize, :&mapper))
  237. }
  238. method grep(Supply:D: Mu $test) {
  239. Supply.new(class :: does SimpleOpTappable {
  240. has Mu $!test;
  241. submethod BUILD(:$!source!, Mu :$!test! --> Nil) { }
  242. method tap(&emit, &done, &quit) {
  243. my int $cleaned-up = 0;
  244. my $source-tap = $!source.tap(
  245. -> \value {
  246. my \accepted = try $!test.ACCEPTS(value);
  247. if accepted {
  248. emit(value);
  249. }
  250. elsif $! {
  251. quit($!);
  252. self!cleanup($cleaned-up, $source-tap);
  253. }
  254. },
  255. done => -> {
  256. done();
  257. self!cleanup($cleaned-up, $source-tap);
  258. },
  259. quit => -> $ex {
  260. quit($ex);
  261. self!cleanup($cleaned-up, $source-tap);
  262. });
  263. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  264. }
  265. }.new(source => self.sanitize, :$test))
  266. }
  267. method schedule-on(Supply:D: Scheduler $scheduler) {
  268. Supply.new(class :: does SimpleOpTappable {
  269. has $!scheduler;
  270. submethod BUILD(:$!source!, :$!scheduler! --> Nil) { }
  271. method tap(&emit, &done, &quit) {
  272. my int $cleaned-up = 0;
  273. my $source-tap = $!source.tap(
  274. -> \value {
  275. $!scheduler.cue: { emit(value) }
  276. },
  277. done => -> {
  278. $!scheduler.cue: { done(); self!cleanup($cleaned-up, $source-tap); }
  279. },
  280. quit => -> $ex {
  281. $!scheduler.cue: { quit($ex); self!cleanup($cleaned-up, $source-tap); }
  282. });
  283. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  284. }
  285. }.new(source => self.sanitize, :$scheduler))
  286. }
  287. method start(Supply:D: &startee) {
  288. self.map: -> \value {
  289. Supply.new(class :: does SimpleOpTappable {
  290. has $!value;
  291. has &!startee;
  292. submethod BUILD(:$!value, :&!startee --> Nil) { }
  293. method tap(&emit, &done, &quit) {
  294. my int $closed = 0;
  295. Promise.start({ &!startee($!value) }).then({
  296. unless $closed {
  297. if .status == Kept {
  298. emit(.result);
  299. done();
  300. }
  301. else {
  302. quit(.cause);
  303. }
  304. }
  305. });
  306. Tap.new({ $closed = 1 })
  307. }
  308. }.new(:value(value), :&startee))
  309. }
  310. }
  311. method stable(Supply:D: $time, :$scheduler = $*SCHEDULER) {
  312. return self unless $time;
  313. Supply.new(class :: does SimpleOpTappable {
  314. has $!time;
  315. has $!scheduler;
  316. has $!last_cancellation;
  317. has $!lock = Lock.new;
  318. submethod BUILD(:$!source!, :$!time!, :$!scheduler! --> Nil) { }
  319. method tap(&emit, &done, &quit) {
  320. my int $cleaned-up = 0;
  321. my $source-tap = $!source.tap(
  322. -> \value {
  323. $!lock.protect: {
  324. if $!last_cancellation {
  325. $!last_cancellation.cancel;
  326. }
  327. $!last_cancellation = $!scheduler.cue(
  328. :in($time),
  329. {
  330. $!lock.protect: { $!last_cancellation = Nil; }
  331. try {
  332. emit(value);
  333. CATCH {
  334. default {
  335. quit($_);
  336. self!cleanup($cleaned-up, $source-tap);
  337. }
  338. }
  339. }
  340. });
  341. }
  342. },
  343. done => -> {
  344. done();
  345. self!cleanup($cleaned-up, $source-tap);
  346. },
  347. quit => -> $ex {
  348. quit($ex);
  349. self!cleanup($cleaned-up, $source-tap);
  350. });
  351. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  352. }
  353. }.new(source => self.sanitize, :$time, :$scheduler))
  354. }
  355. method delayed(Supply:D: $time, :$scheduler = $*SCHEDULER) {
  356. return self unless $time; # nothing to do
  357. Supply.new(class :: does SimpleOpTappable {
  358. has $!time;
  359. has $!scheduler;
  360. submethod BUILD(:$!source!, :$!time, :$!scheduler! --> Nil) { }
  361. method tap(&emit, &done, &quit) {
  362. my int $cleaned-up = 0;
  363. my $source-tap = $!source.tap(
  364. -> \value {
  365. $!scheduler.cue: { emit(value) }, :in($time)
  366. },
  367. done => -> {
  368. $!scheduler.cue:
  369. { done(); self!cleanup($cleaned-up, $source-tap); },
  370. :in($time)
  371. },
  372. quit => -> $ex {
  373. $!scheduler.cue:
  374. { quit($ex); self!cleanup($cleaned-up, $source-tap); },
  375. :in($time)
  376. });
  377. Tap.new({ self!cleanup($cleaned-up, $source-tap) })
  378. }
  379. }.new(source => self.sanitize, :$time, :$scheduler))
  380. }
  381. ##
  382. ## A bunch of the more complex combinators, implemented as supply blocks
  383. ##
  384. method do(Supply:D $self: &side-effect) {
  385. supply {
  386. whenever self -> \value {
  387. side-effect(value);
  388. emit(value);
  389. }
  390. }
  391. }
  392. method flat(Supply:D:) {
  393. supply {
  394. whenever self -> \inner {
  395. whenever inner -> \value {
  396. emit value;
  397. }
  398. }
  399. }
  400. }
  401. method merge(*@s) {
  402. @s.unshift(self) if self.DEFINITE; # add if instance method
  403. return supply { } unless +@s; # nothing to be done
  404. X::Supply::Combinator.new(
  405. combinator => 'merge'
  406. ).throw unless Rakudo::Internals.ALL_DEFINED_TYPE(@s,Supply);
  407. return @s[0].sanitize if +@s == 1; # nothing to be done
  408. supply {
  409. for @s {
  410. whenever $_ -> \value { emit(value) }
  411. }
  412. }
  413. }
  414. method reduce(Supply:D $self: &with) {
  415. supply {
  416. my $first := True;
  417. my $reduced := Nil;
  418. whenever self -> \value {
  419. if $first {
  420. $reduced := value;
  421. $first := False;
  422. }
  423. else {
  424. $reduced := with($reduced, value);
  425. }
  426. LAST {
  427. emit $reduced;
  428. }
  429. }
  430. }
  431. }
  432. method produce(Supply:D $self: &with) {
  433. supply {
  434. my $first := True;
  435. my $reduced := Nil;
  436. whenever self -> \value {
  437. if $first {
  438. $reduced := value;
  439. $first := False;
  440. }
  441. else {
  442. $reduced := with($reduced, value);
  443. }
  444. emit $reduced;
  445. }
  446. }
  447. }
  448. method migrate(Supply:D:) {
  449. supply {
  450. my $current;
  451. whenever self -> \inner {
  452. X::Supply::Migrate::Needs.new.throw
  453. unless nqp::istype(inner, Supply);
  454. $current.close if $current;
  455. $current = do whenever inner -> \value {
  456. emit(value);
  457. }
  458. }
  459. }
  460. }
  461. proto method classify(|) { * }
  462. multi method classify(Supply:D: &mapper ) {
  463. self!classify(&mapper);
  464. }
  465. multi method classify(Supply:D: %mapper ) {
  466. self!classify({ %mapper{$^a} });
  467. }
  468. multi method classify(Supply:D: @mapper ) {
  469. self!classify({ @mapper[$^a] });
  470. }
  471. proto method categorize (|) { * }
  472. multi method categorize(Supply:D: &mapper ) {
  473. self!classify(&mapper, :multi);
  474. }
  475. multi method categorize(Supply:D: %mapper ) {
  476. self!classify({ %mapper{$^a} }, :multi);
  477. }
  478. multi method categorize(Supply:D: @mapper ) {
  479. self!classify({ @mapper[$^a] }, :multi);
  480. }
  481. method !classify(&mapper, :$multi) {
  482. supply {
  483. my %mapping;
  484. sub find-target($key) {
  485. %mapping{ $key.WHICH } //= do {
  486. my $p = Supplier::Preserving.new;
  487. emit($key => $p.Supply);
  488. $p
  489. };
  490. }
  491. whenever self -> \value {
  492. if $multi {
  493. for @(mapper(value)) -> $key {
  494. find-target($key).emit(value);
  495. }
  496. }
  497. else {
  498. find-target(mapper(value)).emit(value);
  499. }
  500. LAST {
  501. %mapping.values>>.done;
  502. }
  503. }
  504. }
  505. }
  506. ##
  507. ## Coercions
  508. ##
  509. method Supply(Supply:) { self }
  510. method Channel(Supply:D:) {
  511. my $c = Channel.new();
  512. self.sanitize.tap:
  513. -> \val { $c.send(val) },
  514. done => { $c.close },
  515. quit => -> $ex { $c.fail($ex) };
  516. $c
  517. }
  518. my class ConcQueue is repr('ConcBlockingQueue') { }
  519. method list(Supply:D:) {
  520. gather {
  521. my Mu \queue = nqp::create(ConcQueue);
  522. my $exception;
  523. self.tap(
  524. -> \val { nqp::push(queue, val) },
  525. done => -> { nqp::push(queue, ConcQueue) }, # type obj as sentinel
  526. quit => -> \ex { $exception := ex; nqp::push(queue, ConcQueue) });
  527. loop {
  528. my \got = nqp::shift(queue);
  529. if got =:= ConcQueue {
  530. $exception.DEFINITE
  531. ?? $exception.throw
  532. !! last
  533. }
  534. else {
  535. take got;
  536. }
  537. }
  538. }
  539. }
  540. method Promise(Supply:D:) {
  541. my $p = Promise.new;
  542. my $v = $p.vow;
  543. my $final := Nil;
  544. my $t = self.tap:
  545. -> \val { $final := val },
  546. done => { $v.keep($final) },
  547. quit => -> \ex { $v.break(ex) };
  548. $p
  549. }
  550. method wait(Supply:D:) { await self.Promise }
  551. my class SupplyAwaitableHandle does Awaitable::Handle {
  552. has $!supply;
  553. method not-ready(Supply:D \supply) {
  554. nqp::create(self)!not-ready(supply)
  555. }
  556. method !not-ready(\supply) {
  557. $!already = False;
  558. $!supply := supply;
  559. self
  560. }
  561. method subscribe-awaiter(&subscriber --> Nil) {
  562. my $final := Nil;
  563. $!supply.tap:
  564. -> \val { $final := val },
  565. done => { subscriber(True, $final) },
  566. quit => -> \ex { subscriber(False, ex) };
  567. }
  568. }
  569. method get-await-handle(--> Awaitable::Handle) {
  570. SupplyAwaitableHandle.not-ready(self)
  571. }
  572. method unique(Supply:D $self: :&as, :&with, :$expires) {
  573. supply {
  574. if $expires {
  575. if &with and !(&with === &[===]) {
  576. my @seen; # really Mu, but doesn't work in settings
  577. my Mu $target;
  578. if &as {
  579. whenever self -> \val {
  580. my $now := now;
  581. $target = &as(val);
  582. my $index =
  583. @seen.first({&with($target,$_[0])},:k);
  584. with $index {
  585. if $now > @seen[$index][1] { # expired
  586. @seen[$index][1] = $now+$expires;
  587. emit(val);
  588. }
  589. }
  590. else {
  591. @seen.push: [$target, $now+$expires];
  592. emit(val);
  593. }
  594. }
  595. }
  596. else {
  597. whenever self -> \val {
  598. my $now := now;
  599. my $index =
  600. @seen.first({&with(val,$_[0])},:k);
  601. with $index {
  602. if $now > @seen[$index][1] { # expired
  603. @seen[$index][1] = $now+$expires;
  604. emit(val);
  605. }
  606. }
  607. else {
  608. @seen.push: [val, $now+$expires];
  609. emit(val);
  610. }
  611. }
  612. }
  613. }
  614. else {
  615. my $seen := nqp::hash();
  616. my str $target;
  617. if &as {
  618. whenever self -> \val {
  619. my $now := now;
  620. $target = nqp::unbox_s(&as(val).WHICH);
  621. if !nqp::existskey($seen,$target) ||
  622. $now > nqp::atkey($seen,$target) { #expired
  623. emit(val);
  624. nqp::bindkey($seen,$target,$now+$expires);
  625. }
  626. }
  627. }
  628. else {
  629. whenever self -> \val {
  630. my $now := now;
  631. $target = nqp::unbox_s(val.WHICH);
  632. if !nqp::existskey($seen,$target) ||
  633. $now > nqp::atkey($seen,$target) { #expired
  634. emit(val);
  635. nqp::bindkey($seen,$target,$now+$expires);
  636. }
  637. }
  638. }
  639. }
  640. }
  641. else { # !$!expires
  642. if &with and !(&with === &[===]) {
  643. my @seen; # really Mu, but doesn't work in settings
  644. my Mu $target;
  645. if &as {
  646. whenever self -> \val {
  647. $target = &as(val);
  648. if @seen.first({ &with($target,$_) } ) =:= Nil {
  649. @seen.push($target);
  650. emit(val);
  651. }
  652. }
  653. }
  654. else {
  655. whenever self -> \val {
  656. if @seen.first({ &with(val,$_) } ) =:= Nil {
  657. @seen.push(val);
  658. emit(val);
  659. }
  660. }
  661. }
  662. }
  663. else {
  664. my $seen := nqp::hash();
  665. my str $target;
  666. if &as {
  667. whenever self -> \val {
  668. $target = nqp::unbox_s(&as(val).WHICH);
  669. unless nqp::existskey($seen, $target) {
  670. nqp::bindkey($seen, $target, 1);
  671. emit(val);
  672. }
  673. }
  674. }
  675. else {
  676. whenever self -> \val {
  677. $target = nqp::unbox_s(val.WHICH);
  678. unless nqp::existskey($seen, $target) {
  679. nqp::bindkey($seen, $target, 1);
  680. emit(val);
  681. }
  682. }
  683. }
  684. }
  685. }
  686. }
  687. }
  688. method squish(Supply:D $self: :&as, :&with is copy) {
  689. &with //= &[===];
  690. supply {
  691. my int $first = 1;
  692. my Mu $last;
  693. my Mu $target;
  694. if &as {
  695. whenever self -> \val {
  696. $target = &as(val);
  697. if $first || !&with($last,$target) {
  698. $first = 0;
  699. emit(val);
  700. }
  701. $last = $target;
  702. }
  703. }
  704. else {
  705. whenever self -> \val {
  706. if $first || !&with($last, val) {
  707. $first = 0;
  708. emit(val);
  709. }
  710. $last = val;
  711. }
  712. }
  713. }
  714. }
  715. multi method rotor(Supply:D $self: Int:D $batch, :$partial) {
  716. self.rotor(($batch,), :$partial)
  717. }
  718. multi method rotor(Supply:D $self: *@cycle, :$partial) {
  719. my @c := @cycle.is-lazy ?? @cycle !! (@cycle xx *).flat.cache;
  720. supply {
  721. my Int $elems;
  722. my Int $gap;
  723. my int $to-skip;
  724. my int $skip;
  725. my \c = @c.iterator;
  726. sub next-batch(--> Nil) {
  727. given c.pull-one {
  728. when Pair {
  729. $elems = +.key;
  730. $gap = +.value;
  731. $to-skip = $gap > 0 ?? $gap !! 0;
  732. }
  733. default {
  734. $elems = +$_;
  735. $gap = 0;
  736. $to-skip = 0;
  737. }
  738. }
  739. }
  740. next-batch;
  741. my @batched;
  742. sub flush(--> Nil) {
  743. emit( @batched.splice(0, +@batched, @batched[* + $gap .. *]) );
  744. $skip = $to-skip;
  745. }
  746. whenever self -> \val {
  747. @batched.push: val unless $skip && $skip--;
  748. if @batched.elems == $elems {
  749. flush;
  750. next-batch;
  751. }
  752. LAST {
  753. flush if @batched and $partial;
  754. }
  755. }
  756. }
  757. }
  758. method batch(Supply:D $self: :$elems, :$seconds ) {
  759. return self if (!$elems or $elems == 1) and !$seconds; # nothing to do
  760. supply {
  761. my @batched;
  762. my $last_time;
  763. sub flush(--> Nil) {
  764. emit([@batched]);
  765. @batched = ();
  766. }
  767. sub final-flush(--> Nil) {
  768. flush if @batched;
  769. }
  770. if $seconds {
  771. $last_time = time div $seconds;
  772. if $elems { # and $seconds
  773. whenever self -> \val {
  774. my $this_time = time div $seconds;
  775. if $this_time != $last_time {
  776. flush if @batched;
  777. $last_time = $this_time;
  778. @batched.push: val;
  779. }
  780. else {
  781. @batched.push: val;
  782. flush if @batched.elems == $elems;
  783. }
  784. LAST { final-flush; }
  785. }
  786. }
  787. else {
  788. whenever self -> \val {
  789. my $this_time = time div $seconds;
  790. if $this_time != $last_time {
  791. flush if @batched;
  792. $last_time = $this_time;
  793. }
  794. @batched.push: val;
  795. LAST { final-flush; }
  796. }
  797. }
  798. }
  799. else { # just $elems
  800. whenever self -> \val {
  801. @batched.push: val;
  802. flush if @batched.elems == $elems;
  803. LAST { final-flush; }
  804. }
  805. }
  806. }
  807. }
  808. method lines(Supply:D $self: :$chomp = True ) {
  809. supply {
  810. my str $str;
  811. my int $chars;
  812. my int $left;
  813. my int $pos;
  814. my int $nextpos;
  815. my int $found;
  816. whenever self -> \val {
  817. $str = $str ~ nqp::unbox_s(val);
  818. $chars = nqp::chars($str);
  819. $pos = 0;
  820. while ($left = $chars - $pos) > 0 {
  821. $nextpos = nqp::findcclass(
  822. nqp::const::CCLASS_NEWLINE, $str, $pos, $left
  823. );
  824. last
  825. if $nextpos >= $chars # no line delimiter
  826. or $nextpos == $chars - 1 # broken CRLF ?
  827. && nqp::eqat($str, "\r", $nextpos); # yes!
  828. if $chomp {
  829. emit( ($found = $nextpos - $pos)
  830. ?? nqp::p6box_s(nqp::substr($str,$pos,$found))
  831. !! ''
  832. );
  833. $pos = $nextpos + 1;
  834. }
  835. else {
  836. $found = $nextpos - $pos + 1;
  837. emit(
  838. nqp::p6box_s(nqp::substr($str,$pos,$found)));
  839. $pos = $pos + $found;
  840. }
  841. }
  842. $str = $pos < $chars
  843. ?? nqp::substr($str,$pos)
  844. !! '';
  845. LAST {
  846. if $str {
  847. $chars = nqp::chars($str);
  848. emit( $chomp && nqp::iscclass(
  849. nqp::const::CCLASS_NEWLINE,$str,$chars-1)
  850. ?? nqp::p6box_s(nqp::substr($str,0,$chars - 1))
  851. !! nqp::p6box_s($str)
  852. );
  853. }
  854. }
  855. }
  856. }
  857. }
  858. method words(Supply:D $self:) {
  859. supply {
  860. my str $str;
  861. my int $chars;
  862. my int $left;
  863. my int $pos;
  864. my int $nextpos;
  865. my int $found;
  866. my int $cr;
  867. my int $crlf;
  868. whenever self -> \val {
  869. $str = $str ~ nqp::unbox_s(val);
  870. $chars = nqp::chars($str);
  871. $pos = nqp::findnotcclass(
  872. nqp::const::CCLASS_WHITESPACE, $str, 0, $chars);
  873. while ($left = $chars - $pos) > 0 {
  874. $nextpos = nqp::findcclass(
  875. nqp::const::CCLASS_WHITESPACE, $str, $pos, $left
  876. );
  877. last unless $left = $chars - $nextpos; # broken word
  878. emit( nqp::box_s(
  879. nqp::substr( $str, $pos, $nextpos - $pos ), Str)
  880. );
  881. $pos = nqp::findnotcclass(
  882. nqp::const::CCLASS_WHITESPACE,$str,$nextpos,$left);
  883. }
  884. $str = $pos < $chars
  885. ?? nqp::substr($str,$pos)
  886. !! '';
  887. LAST {
  888. emit( nqp::box_s($str, Str) ) if $str;
  889. }
  890. }
  891. }
  892. }
  893. method elems(Supply:D $self: $seconds? ) {
  894. supply {
  895. my int $elems = 0;
  896. if $seconds {
  897. my $last_time = time div $seconds;
  898. my int $last_elems = $elems;
  899. whenever self -> \val {
  900. $last_elems = $elems = $elems + 1;
  901. my $this_time = time div $seconds;
  902. if $this_time != $last_time {
  903. emit $elems;
  904. $last_time = $this_time;
  905. }
  906. LAST emit($elems) if $elems != $last_elems;
  907. }
  908. }
  909. else {
  910. whenever self -> \val { emit $elems = $elems + 1 }
  911. }
  912. }
  913. }
  914. method head(Supply:D: Int(Cool) $number = 1) {
  915. supply {
  916. my int $todo = $number;
  917. whenever self -> \val {
  918. if $todo > 0 {
  919. emit val;
  920. $todo = $todo - 1;
  921. }
  922. done if $todo <= 0; # nothing left to do
  923. }
  924. }
  925. }
  926. method tail(Supply:D: Int(Cool) $number = 1) {
  927. my int $size = $number;
  928. supply {
  929. if $size == 1 {
  930. my $last;
  931. whenever self -> \val {
  932. $last := val;
  933. LAST emit $last;
  934. }
  935. }
  936. elsif $size > 1 {
  937. my $lastn := nqp::list;
  938. my int $index = 0;
  939. nqp::setelems($lastn,$number); # presize list
  940. nqp::setelems($lastn,0);
  941. whenever self -> \val {
  942. nqp::bindpos($lastn,$index,val);
  943. $index = ($index + 1) % $size;
  944. LAST {
  945. my int $todo = nqp::elems($lastn);
  946. $index = 0 # start from beginning
  947. if $todo < $size; # if not a full set
  948. while $todo {
  949. emit nqp::atpos($lastn,$index);
  950. $index = ($index + 1) % $size;
  951. $todo = $todo - 1;
  952. }
  953. }
  954. }
  955. }
  956. else { # number <= 0, needed to keep tap open
  957. whenever self -> \val { }
  958. }
  959. }
  960. }
  961. method skip(Supply:D: Int(Cool) $number = 1) {
  962. supply {
  963. my int $size = $number + 1;
  964. my int $skipping = $size > 1;
  965. whenever self {
  966. .emit unless $skipping && ($skipping = --$size)
  967. }
  968. }
  969. }
  970. method min(Supply:D $self: &by = &infix:<cmp>) {
  971. my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
  972. supply {
  973. my $min;
  974. whenever self -> \val {
  975. if val.defined and !$min.defined || cmp(val,$min) < 0 {
  976. emit( $min := val );
  977. }
  978. }
  979. }
  980. }
  981. method max(Supply:D $self: &by = &infix:<cmp>) {
  982. my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
  983. supply {
  984. my $max;
  985. whenever self -> \val {
  986. if val.defined and !$max.defined || cmp(val,$max) > 0 {
  987. emit( $max = val );
  988. }
  989. }
  990. }
  991. }
  992. method minmax(Supply:D $self: &by = &infix:<cmp>) {
  993. my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
  994. supply {
  995. my $min;
  996. my $max;
  997. whenever self -> \val {
  998. if nqp::istype(val,Failure) {
  999. val.throw; # XXX or just ignore ???
  1000. }
  1001. elsif val.defined {
  1002. if !$min.defined {
  1003. emit( Range.new($min = val, $max = val) );
  1004. }
  1005. elsif cmp(val,$min) < 0 {
  1006. emit( Range.new( $min = val, $max ) );
  1007. }
  1008. elsif cmp(val,$max) > 0 {
  1009. emit( Range.new( $min, $max = val ) );
  1010. }
  1011. }
  1012. }
  1013. }
  1014. }
  1015. method grab(Supply:D $self: &when_done) {
  1016. supply {
  1017. my @seen;
  1018. whenever self -> \val {
  1019. @seen.push: val;
  1020. LAST {
  1021. emit($_) for when_done(@seen);
  1022. }
  1023. }
  1024. }
  1025. }
  1026. method reverse(Supply:D:) { self.grab( {.reverse} ) }
  1027. multi method sort(Supply:D:) { self.grab( {.sort} ) }
  1028. multi method sort(Supply:D: &by) { self.grab( {.sort(&by)} ) }
  1029. method zip(**@s, :&with) {
  1030. @s.unshift(self) if self.DEFINITE; # add if instance method
  1031. return supply { } unless +@s; # nothing to be done
  1032. X::Supply::Combinator.new(
  1033. combinator => 'zip'
  1034. ).throw unless Rakudo::Internals.ALL_DEFINED_TYPE(@s,Supply);
  1035. return @s[0] if +@s == 1; # nothing to be done
  1036. supply {
  1037. my @values = [] xx +@s;
  1038. for @s.kv -> $index, $supply {
  1039. if &with {
  1040. whenever $supply -> \val {
  1041. @values[$index].push(val);
  1042. emit( [[&with]] @values.map(*.shift) ) if all(@values);
  1043. }
  1044. }
  1045. else {
  1046. whenever $supply -> \val {
  1047. @values[$index].push(val);
  1048. emit( $(@values.map(*.shift).list) ) if all(@values);
  1049. }
  1050. }
  1051. }
  1052. }
  1053. }
  1054. method zip-latest(**@s, :&with, :$initial ) {
  1055. @s.unshift(self) if self.DEFINITE; # add if instance method
  1056. return supply { } unless +@s; # nothing to do.
  1057. X::Supply::Combinator.new(
  1058. combinator => 'zip-latest'
  1059. ).throw unless Rakudo::Internals.ALL_DEFINED_TYPE(@s,Supply);
  1060. return @s[0] if +@s == 1; # nothing to do.
  1061. supply {
  1062. my @values;
  1063. my $uninitialised = +@s; # how many supplies have yet to emit until we
  1064. # can start emitting, too?
  1065. if $initial {
  1066. @values = @$initial;
  1067. $uninitialised = 0 max $uninitialised - @$initial;
  1068. }
  1069. for @s.kv -> $index, $supply {
  1070. if &with {
  1071. whenever $supply -> \val {
  1072. --$uninitialised
  1073. if $uninitialised > 0 && not @values.EXISTS-POS($index);
  1074. @values[$index] = val;
  1075. emit( [[&with]] @values ) unless $uninitialised;
  1076. }
  1077. }
  1078. else {
  1079. whenever $supply -> \val {
  1080. --$uninitialised
  1081. if $uninitialised > 0 && not @values.EXISTS-POS($index);
  1082. @values[$index] = val;
  1083. emit( @values.List.item ) unless $uninitialised;
  1084. }
  1085. }
  1086. }
  1087. }
  1088. }
  1089. proto method throttle(|) { * }
  1090. multi method throttle(Supply:D $self:
  1091. Int() $elems,
  1092. Real() $seconds,
  1093. Real() $delay = 0,
  1094. :$scheduler = $*SCHEDULER,
  1095. :$control,
  1096. :$status,
  1097. :$bleed,
  1098. :$vent-at,
  1099. ) {
  1100. my $timer = Supply.interval($seconds,$delay,:$scheduler);
  1101. my int $limit = $elems;
  1102. my int $vent = $vent-at if $bleed;;
  1103. supply {
  1104. my @buffer;
  1105. my int $allowed = $limit;
  1106. my int $emitted;
  1107. my int $bled;
  1108. my int $done;
  1109. sub emit-status($id --> Nil) {
  1110. $status.emit(
  1111. { :$allowed, :$bled, :buffered(+@buffer),
  1112. :$emitted, :$id, :$limit, :$vent-at } );
  1113. }
  1114. whenever $timer -> \tick {
  1115. if +@buffer -> \buffered {
  1116. my int $todo = buffered > $limit ?? $limit !! buffered;
  1117. emit(@buffer.shift) for ^$todo;
  1118. $emitted = $emitted + $todo;
  1119. $allowed = $limit - $todo;
  1120. }
  1121. else {
  1122. $allowed = $limit;
  1123. }
  1124. if $done && !@buffer {
  1125. done;
  1126. }
  1127. }
  1128. whenever self -> \val {
  1129. if $allowed {
  1130. emit(val);
  1131. $emitted = $emitted + 1;
  1132. $allowed = $allowed - 1;
  1133. }
  1134. elsif $vent && +@buffer >= $vent {
  1135. $bleed.emit(val);
  1136. }
  1137. else {
  1138. @buffer.push(val);
  1139. }
  1140. LAST {
  1141. if $status {
  1142. emit-status("done");
  1143. $status.done;
  1144. }
  1145. if $bleed && @buffer {
  1146. $bleed.emit(@buffer.shift) while @buffer;
  1147. $bleed.done;
  1148. }
  1149. $done = 0;
  1150. }
  1151. }
  1152. if $control {
  1153. whenever $control -> \val {
  1154. my str $type;
  1155. my str $value;
  1156. Rakudo::Internals.KEY_COLON_VALUE(val,$type,$value);
  1157. if $type eq 'limit' {
  1158. my int $extra = $value - $limit;
  1159. $allowed = $extra > 0 || $allowed + $extra >= 0
  1160. ?? $allowed + $extra
  1161. !! 0;
  1162. $limit = $value;
  1163. }
  1164. elsif $type eq 'bleed' && $bleed {
  1165. my int $todo = $value min +@buffer;
  1166. $bleed.emit(@buffer.shift) for ^$todo;
  1167. $bled = $bled + $todo;
  1168. }
  1169. elsif $type eq 'status' && $status {
  1170. emit-status($value);
  1171. }
  1172. elsif $type eq 'vent-at' && $bleed {
  1173. $vent = $value;
  1174. if $vent && +@buffer > $vent {
  1175. $bleed.emit(@buffer.shift)
  1176. until !@buffer || +@buffer == $vent;
  1177. }
  1178. }
  1179. }
  1180. }
  1181. }
  1182. }
  1183. multi method throttle(Supply:D $self:
  1184. Int() $elems,
  1185. Callable:D $process,
  1186. Real() $delay = 0,
  1187. :$scheduler = $*SCHEDULER,
  1188. :$control,
  1189. :$status,
  1190. :$bleed,
  1191. :$vent-at,
  1192. ) {
  1193. sleep $delay if $delay;
  1194. my @buffer;
  1195. my int $limit = $elems;
  1196. my int $allowed = $limit;
  1197. my int $running;
  1198. my int $emitted;
  1199. my int $bled;
  1200. my int $done;
  1201. my int $vent = $vent-at if $bleed;
  1202. my $ready = Supplier::Preserving.new;
  1203. sub start-process(\val --> Nil) {
  1204. my $p = Promise.start( $process, :$scheduler, val );
  1205. $running = $running + 1;
  1206. $allowed = $allowed - 1;
  1207. $p.then: { $ready.emit($p) };
  1208. }
  1209. sub emit-status($id --> Nil) {
  1210. $status.emit(
  1211. { :$allowed, :$bled, :buffered(+@buffer),
  1212. :$emitted, :$id, :$limit, :$running } );
  1213. }
  1214. supply {
  1215. whenever $ready.Supply -> \val { # when a process is ready
  1216. $running = $running - 1;
  1217. $allowed = $allowed + 1;
  1218. emit(val);
  1219. $emitted = $emitted + 1;
  1220. start-process(@buffer.shift) if $allowed > 0 && @buffer;
  1221. if $done && !$running {
  1222. $control.done if $control;
  1223. if $status {
  1224. emit-status("done");
  1225. $status.done;
  1226. }
  1227. if $bleed && @buffer {
  1228. $bleed.emit(@buffer.shift) while @buffer;
  1229. $bleed.done;
  1230. }
  1231. done;
  1232. }
  1233. }
  1234. if $control {
  1235. whenever $control -> \val {
  1236. my str $type;
  1237. my str $value;
  1238. Rakudo::Internals.KEY_COLON_VALUE(val,$type,$value);
  1239. if $type eq 'limit' {
  1240. $allowed = $allowed + $value - $limit;
  1241. $limit = $value;
  1242. start-process(@buffer.shift)
  1243. while $allowed > 0 && @buffer;
  1244. }
  1245. elsif $type eq 'bleed' && $bleed {
  1246. my int $todo = $value min +@buffer;
  1247. $bleed.emit(@buffer.shift) for ^$todo;
  1248. $bled = $bled + $todo;
  1249. }
  1250. elsif $type eq 'status' && $status {
  1251. emit-status($value);
  1252. }
  1253. elsif $type eq 'vent-at' && $bleed {
  1254. $vent = $value;
  1255. if $vent && +@buffer > $vent {
  1256. $bleed.emit(@buffer.shift)
  1257. until !@buffer || +@buffer == $vent;
  1258. }
  1259. }
  1260. }
  1261. }
  1262. whenever self -> \val {
  1263. $allowed > 0
  1264. ?? start-process(val)
  1265. !! $vent && $vent == +@buffer
  1266. ?? $bleed.emit(val)
  1267. !! @buffer.push(val);
  1268. LAST { $done = 1 }
  1269. }
  1270. }
  1271. }
  1272. method share(Supply:D:) {
  1273. my $sup = Supplier.new;
  1274. self.tap:
  1275. -> \msg { $sup.emit(msg) },
  1276. done => -> { $sup.done() },
  1277. quit => -> \ex { $sup.quit(ex) }
  1278. $sup.Supply
  1279. }
  1280. }
  1281. # A Supplier is a convenient way to create a live Supply. The publisher can
  1282. # be used to emit/done/quit. The Supply objects obtained from it will tap into
  1283. # the same live Supply.
  1284. my class Supplier {
  1285. my class TapList does Tappable {
  1286. my class TapListEntry {
  1287. has &.emit;
  1288. has &.done;
  1289. has &.quit;
  1290. }
  1291. # Lock serializes updates to tappers.
  1292. has Lock $!lock = Lock.new;
  1293. # An immutable list of tappers. Always replaced on change, never
  1294. # mutated in-place ==> thread safe together with lock (and only
  1295. # need lock on modification).
  1296. has Mu $!tappers;
  1297. method tap(&emit, &done, &quit) {
  1298. my $tle := TapListEntry.new(:&emit, :&done, :&quit);
  1299. $!lock.protect({
  1300. my Mu $update := nqp::isconcrete($!tappers)
  1301. ?? nqp::clone($!tappers)
  1302. !! nqp::list();
  1303. nqp::push($update, $tle);
  1304. $!tappers := $update;
  1305. });
  1306. Tap.new({
  1307. $!lock.protect({
  1308. my Mu $update := nqp::list();
  1309. for nqp::hllize($!tappers) -> \entry {
  1310. nqp::push($update, entry) unless entry =:= $tle;
  1311. }
  1312. $!tappers := $update;
  1313. });
  1314. })
  1315. }
  1316. method emit(\value --> Nil) {
  1317. my $snapshot := $!tappers;
  1318. if nqp::isconcrete($snapshot) {
  1319. my int $n = nqp::elems($snapshot);
  1320. loop (my int $i = 0; $i < $n; $i = $i + 1) {
  1321. nqp::atpos($snapshot, $i).emit()(value);
  1322. }
  1323. }
  1324. }
  1325. method done(--> Nil) {
  1326. my $snapshot := $!tappers;
  1327. if nqp::isconcrete($snapshot) {
  1328. my int $n = nqp::elems($snapshot);
  1329. loop (my int $i = 0; $i < $n; $i = $i + 1) {
  1330. nqp::atpos($snapshot, $i).done()();
  1331. }
  1332. }
  1333. }
  1334. method quit($ex --> Nil) {
  1335. my $snapshot := $!tappers;
  1336. if nqp::isconcrete($snapshot) {
  1337. my int $n = nqp::elems($snapshot);
  1338. loop (my int $i = 0; $i < $n; $i = $i + 1) {
  1339. nqp::atpos($snapshot, $i).quit()($ex);
  1340. }
  1341. }
  1342. }
  1343. method live(--> True) { }
  1344. method serial(--> False) { }
  1345. method sane(--> False) { }
  1346. }
  1347. has $!taplist;
  1348. method new() {
  1349. self.bless(taplist => TapList.new)
  1350. }
  1351. submethod BUILD(:$!taplist! --> Nil) { }
  1352. method emit(Supplier:D: Mu \value) {
  1353. $!taplist.emit(value);
  1354. }
  1355. method done(Supplier:D:) {
  1356. $!taplist.done();
  1357. }
  1358. proto method quit($) { * }
  1359. multi method quit(Supplier:D: Exception $ex) {
  1360. $!taplist.quit($ex);
  1361. }
  1362. multi method quit(Supplier:D: Str() $message) {
  1363. $!taplist.quit(X::AdHoc.new(payload => $message));
  1364. }
  1365. method Supply(Supplier:D:) {
  1366. Supply.new($!taplist).sanitize
  1367. }
  1368. method unsanitized-supply(Supplier:D:) {
  1369. Supply.new($!taplist)
  1370. }
  1371. }
  1372. # A preserving supplier holds on to emitted values and state when nobody is
  1373. # tapping. As soon as there a tap is made, any preserved events will be
  1374. # immediately sent to that tapper.
  1375. my class Supplier::Preserving is Supplier {
  1376. my class PreservingTapList does Tappable {
  1377. my class TapListEntry {
  1378. has &.emit;
  1379. has &.done;
  1380. has &.quit;
  1381. }
  1382. # Lock serializes updates to tappers.
  1383. has Lock $!lock = Lock.new;
  1384. # An immutable list of tappers. Always replaced on change, never
  1385. # mutated in-place ==> thread safe together with lock (and only
  1386. # need lock on modification).
  1387. has Mu $!tappers;
  1388. # Events to reply, whether the replay was done, and a lock to protect
  1389. # updates to these.
  1390. has @!replay;
  1391. has int $!replay-done;
  1392. has $!replay-lock = Lock.new;
  1393. method tap(&emit, &done, &quit) {
  1394. my $tle := TapListEntry.new(:&emit, :&done, :&quit);
  1395. my int $replay = 0;
  1396. $!lock.protect({
  1397. my Mu $update := nqp::isconcrete($!tappers)
  1398. ?? nqp::clone($!tappers)
  1399. !! nqp::list();
  1400. nqp::push($update, $tle);
  1401. $replay = 1 if nqp::elems($update) == 1;
  1402. self!replay($tle) if $replay;
  1403. $!tappers := $update;
  1404. });
  1405. Tap.new({
  1406. $!lock.protect({
  1407. my Mu $update := nqp::list();
  1408. for nqp::hllize($!tappers) -> \entry {
  1409. nqp::push($update, entry) unless entry =:= $tle;
  1410. }
  1411. $!replay-done = 0 if nqp::elems($update) == 0;
  1412. $!tappers := $update;
  1413. });
  1414. })
  1415. }
  1416. method emit(\value --> Nil) {
  1417. loop {
  1418. my int $sent = 0;
  1419. my $snapshot := $!tappers;
  1420. if nqp::isconcrete($snapshot) {
  1421. $sent = nqp::elems($snapshot);
  1422. loop (my int $i = 0; $i < $sent; $i = $i + 1) {
  1423. nqp::atpos($snapshot, $i).emit()(value);
  1424. }
  1425. }
  1426. return if $sent;
  1427. return if self!add-replay({ $_.emit()(value) });
  1428. }
  1429. }
  1430. method done(--> Nil) {
  1431. loop {
  1432. my int $sent = 0;
  1433. my $snapshot := $!tappers;
  1434. if nqp::isconcrete($snapshot) {
  1435. $sent = nqp::elems($snapshot);
  1436. loop (my int $i = 0; $i < $sent; $i = $i + 1) {
  1437. nqp::atpos($snapshot, $i).done()();
  1438. }
  1439. }
  1440. return if $sent;
  1441. return if self!add-replay({ $_.done()() });
  1442. }
  1443. }
  1444. method quit($ex --> Nil) {
  1445. loop {
  1446. my int $sent = 0;
  1447. my $snapshot := $!tappers;
  1448. if nqp::isconcrete($snapshot) {
  1449. $sent = nqp::elems($snapshot);
  1450. loop (my int $i = 0; $i < $sent; $i = $i + 1) {
  1451. nqp::atpos($snapshot, $i).quit()($ex);
  1452. }
  1453. }
  1454. return if $sent;
  1455. return if self!add-replay({ $_.quit()($ex) });
  1456. }
  1457. }
  1458. method !add-replay(&replay --> Bool) {
  1459. $!replay-lock.protect: {
  1460. if $!replay-done {
  1461. False
  1462. }
  1463. else {
  1464. @!replay.push(&replay);
  1465. True
  1466. }
  1467. }
  1468. }
  1469. method !replay($tle) {
  1470. $!replay-lock.protect: {
  1471. while @!replay.shift -> $rep {
  1472. $rep($tle);
  1473. }
  1474. $!replay-done = 1;
  1475. }
  1476. }
  1477. method live(--> True) { }
  1478. method serial(--> False) { }
  1479. method sane(--> False) { }
  1480. }
  1481. method new() {
  1482. self.bless(taplist => PreservingTapList.new)
  1483. }
  1484. }
  1485. sub SUPPLY(&block) {
  1486. my class SupplyBlockState {
  1487. has &.emit;
  1488. has &.done;
  1489. has &.quit;
  1490. has @.close-phasers;
  1491. has $!active = 1;
  1492. has $!lock = Lock.new;
  1493. has %!active-taps;
  1494. has @!queued-operations;
  1495. method increment-active() {
  1496. $!lock.protect: { ++$!active }
  1497. }
  1498. method decrement-active() {
  1499. $!lock.protect: { --$!active }
  1500. }
  1501. method get-and-zero-active() {
  1502. $!lock.protect: {
  1503. my $result = $!active;
  1504. $!active = 0;
  1505. $result
  1506. }
  1507. }
  1508. method add-active-tap($tap --> Nil) {
  1509. $!lock.protect: { %!active-taps{nqp::objectid($tap)} = $tap }
  1510. }
  1511. method delete-active-tap($tap --> Nil) {
  1512. $!lock.protect: { %!active-taps{nqp::objectid($tap)}:delete }
  1513. }
  1514. method consume-active-taps() {
  1515. my @active;
  1516. $!lock.protect: {
  1517. @active = %!active-taps.values;
  1518. %!active-taps = ();
  1519. }
  1520. @active
  1521. }
  1522. method run-operation(&op --> Nil) {
  1523. if $!active {
  1524. my $run-now = False;
  1525. $!lock.protect({
  1526. if @!queued-operations {
  1527. @!queued-operations.push({
  1528. op();
  1529. self!maybe-another();
  1530. });
  1531. }
  1532. else {
  1533. @!queued-operations.push(&op);
  1534. $run-now = True;
  1535. }
  1536. });
  1537. if $run-now {
  1538. op();
  1539. self!maybe-another();
  1540. }
  1541. }
  1542. }
  1543. method !maybe-another(--> Nil) {
  1544. my &another;
  1545. $!lock.protect({
  1546. @!queued-operations.shift;
  1547. &another = @!queued-operations[0] if $!active && @!queued-operations;
  1548. });
  1549. &another && another();
  1550. }
  1551. }
  1552. Supply.new(class :: does Tappable {
  1553. has &!block;
  1554. submethod BUILD(:&!block --> Nil) { }
  1555. method tap(&emit, &done, &quit) {
  1556. my $state = SupplyBlockState.new(:&emit, :&done, :&quit);
  1557. self!run-supply-code(&!block, $state);
  1558. if nqp::istype(&!block,Block) {
  1559. $state.close-phasers.push(.clone) for &!block.phasers('CLOSE')
  1560. }
  1561. self!deactivate-one($state);
  1562. Tap.new(-> { self!teardown($state) })
  1563. }
  1564. method !run-supply-code(&code, $state) {
  1565. $state.run-operation({
  1566. my &*ADD-WHENEVER = sub ($supply, &whenever-block) {
  1567. $state.increment-active();
  1568. my $tap = $supply.tap(
  1569. -> \value {
  1570. self!run-supply-code({ whenever-block(value) }, $state)
  1571. },
  1572. done => {
  1573. $state.delete-active-tap($tap) if $tap.DEFINITE;
  1574. my @phasers := &whenever-block.phasers('LAST');
  1575. if @phasers {
  1576. self!run-supply-code({ .() for @phasers }, $state)
  1577. }
  1578. self!deactivate-one($state);
  1579. },
  1580. quit => -> \ex {
  1581. $state.delete-active-tap($tap) if $tap.DEFINITE;
  1582. self!run-supply-code({
  1583. my $handled;
  1584. my $phaser := &whenever-block.phasers('QUIT')[0];
  1585. if $phaser.DEFINITE {
  1586. $handled = $phaser(ex) === Nil;
  1587. }
  1588. if $handled {
  1589. self!deactivate-one($state);
  1590. }
  1591. elsif $state.get-and-zero-active() {
  1592. $state.quit().(ex) if $state.quit;
  1593. self!teardown($state);
  1594. }
  1595. }, $state);
  1596. });
  1597. $state.add-active-tap($tap);
  1598. $tap
  1599. }
  1600. my $emitter = {
  1601. my \ex := nqp::exception();
  1602. $state.emit().(nqp::getpayload(ex)) if $state.emit;
  1603. nqp::resume(ex)
  1604. }
  1605. my $done = {
  1606. $state.done().() if $state.done;
  1607. $state.get-and-zero-active();
  1608. self!teardown($state);
  1609. }
  1610. my $catch = {
  1611. my \ex = EXCEPTION(nqp::exception());
  1612. $state.quit().(ex) if $state.quit;
  1613. $state.get-and-zero-active();
  1614. self!teardown($state);
  1615. }
  1616. nqp::handle(code(),
  1617. 'EMIT', $emitter(),
  1618. 'DONE', $done(),
  1619. 'CATCH', $catch(),
  1620. 'NEXT', 0);
  1621. });
  1622. }
  1623. method !deactivate-one($state) {
  1624. $state.run-operation({
  1625. if $state.decrement-active() == 0 {
  1626. $state.done().() if $state.done;
  1627. self!teardown($state);
  1628. }
  1629. });
  1630. }
  1631. method !teardown($state) {
  1632. .close for $state.consume-active-taps;
  1633. while $state.close-phasers.pop() -> $close {
  1634. $close();
  1635. }
  1636. }
  1637. method live(--> False) { }
  1638. method sane(--> True) { }
  1639. method serial(--> True) { }
  1640. }.new(:&block))
  1641. }
  1642. sub WHENEVER(Supply() $supply, &block) {
  1643. my \adder = &*ADD-WHENEVER;
  1644. adder.defined
  1645. ?? adder.($supply, &block)
  1646. !! X::WheneverOutOfScope.new.throw
  1647. }
  1648. sub REACT(&block) {
  1649. my $s = SUPPLY(&block);
  1650. my $p = Promise.new;
  1651. $s.tap(
  1652. { warn "Useless use of emit in react" },
  1653. done => { $p.keep(Nil) },
  1654. quit => { $p.break($_) });
  1655. await $p;
  1656. }