1. my class Proc::Async { ... }
  2. my role X::Proc::Async is Exception {
  3. has Proc::Async $.proc;
  4. }
  5. my class X::Proc::Async::TapBeforeSpawn does X::Proc::Async {
  6. has $.handle;
  7. method message() {
  8. "To avoid data races, you must tap $!handle before running the process"
  9. }
  10. }
  11. my class X::Proc::Async::CharsOrBytes does X::Proc::Async {
  12. has $.handle;
  13. method message() {
  14. "Can only tap one of chars or bytes supply for $!handle"
  15. }
  16. }
  17. my class X::Proc::Async::AlreadyStarted does X::Proc::Async {
  18. method message() {
  19. "Process has already been started"
  20. }
  21. }
  22. my class X::Proc::Async::MustBeStarted does X::Proc::Async {
  23. has $.method;
  24. method message() {
  25. "Process must be started first before calling '$!method'"
  26. }
  27. }
  28. my class X::Proc::Async::OpenForWriting does X::Proc::Async {
  29. has $.method;
  30. method message() {
  31. "Process must be opened for writing with :w to call '$!method'"
  32. }
  33. }
  34. my class Proc::Async {
  35. my class ProcessCancellation is repr('AsyncTask') { }
  36. my enum CharsOrBytes ( :Bytes(0), :Chars(1) );
  37. has $.path;
  38. has @.args;
  39. has $.w;
  40. has $.enc = 'utf8';
  41. has $.translate-nl = True;
  42. has Bool $.started = False;
  43. has $!stdout_supply;
  44. has CharsOrBytes $!stdout_type;
  45. has $!stderr_supply;
  46. has CharsOrBytes $!stderr_type;
  47. has $!process_handle;
  48. has $!exit_promise;
  49. has @!promises;
  50. proto method new(|) { * }
  51. multi method new($path, *@args, *%_) {
  52. self.bless(:$path, :@args, |%_)
  53. }
  54. method !supply(\what,\the-supply,\type,\value) {
  55. X::Proc::Async::TapBeforeSpawn.new(handle => what, proc => self).throw
  56. if $!started;
  57. X::Proc::Async::CharsOrBytes.new(handle => what, proc => self).throw
  58. if the-supply and type != value;
  59. type = value;
  60. the-supply //= Supplier::Preserving.new;
  61. }
  62. proto method stdout(|) { * }
  63. multi method stdout(Proc::Async:D: :$bin!) {
  64. $bin
  65. ?? self!supply('stdout', $!stdout_supply, $!stdout_type, Bytes).Supply
  66. !! self.stdout(|%_)
  67. }
  68. multi method stdout(Proc::Async:D: :$enc, :$translate-nl) {
  69. self!wrap-decoder:
  70. self!supply('stdout', $!stdout_supply, $!stdout_type, Chars).Supply,
  71. $enc, :$translate-nl
  72. }
  73. proto method stderr(|) { * }
  74. multi method stderr(Proc::Async:D: :$bin!) {
  75. $bin
  76. ?? self!supply('stderr', $!stderr_supply, $!stderr_type, Bytes).Supply
  77. !! self.stderr(|%_)
  78. }
  79. multi method stderr(Proc::Async:D: :$enc, :$translate-nl) {
  80. self!wrap-decoder:
  81. self!supply('stderr', $!stderr_supply, $!stderr_type, Chars).Supply,
  82. $enc, :$translate-nl
  83. }
  84. method !wrap-decoder(Supply:D $bin-supply, $enc, :$translate-nl) {
  85. Rakudo::Internals.BYTE_SUPPLY_DECODER($bin-supply, $enc // $!enc,
  86. :translate-nl($translate-nl // $!translate-nl))
  87. }
  88. method !capture(\callbacks,\std,\the-supply) {
  89. my $promise = Promise.new;
  90. my $vow = $promise.vow;
  91. my $ss = Rakudo::Internals::SupplySequencer.new(
  92. on-data-ready => -> \data { the-supply.emit(data) },
  93. on-completed => -> { the-supply.done(); $vow.keep(the-supply) },
  94. on-error => -> \err { the-supply.quit(err); $vow.keep((the-supply,err)) });
  95. nqp::bindkey(callbacks,
  96. std ~ '_bytes' ,
  97. -> Mu \seq, Mu \data, Mu \err { $ss.process(seq, data, err) });
  98. $promise;
  99. }
  100. method start(Proc::Async:D: :$scheduler = $*SCHEDULER, :$ENV, :$cwd = $*CWD) {
  101. X::Proc::Async::AlreadyStarted.new(proc => self).throw if $!started;
  102. $!started = True;
  103. my %ENV := $ENV ?? $ENV.hash !! %*ENV;
  104. $!exit_promise = Promise.new;
  105. my Mu $callbacks := nqp::hash();
  106. nqp::bindkey($callbacks, 'done', -> Mu \status {
  107. $!exit_promise.keep(Proc.new(
  108. :exitcode(status +> 8), :signal(status +& 0xFF),
  109. :command[ $!path, |@!args ],
  110. ))
  111. });
  112. nqp::bindkey($callbacks, 'error', -> Mu \err {
  113. $!exit_promise.break(X::OS.new(os-error => err));
  114. });
  115. @!promises.push(
  116. self!capture($callbacks,'stdout',$!stdout_supply)
  117. ) if $!stdout_supply;
  118. @!promises.push(
  119. self!capture($callbacks,'stderr',$!stderr_supply)
  120. ) if $!stderr_supply;
  121. nqp::bindkey($callbacks, 'buf_type', buf8.new);
  122. nqp::bindkey($callbacks, 'write', True) if $.w;
  123. $!process_handle := nqp::spawnprocasync($scheduler.queue,
  124. CLONE-LIST-DECONTAINERIZED($!path,@!args),
  125. $cwd.Str,
  126. CLONE-HASH-DECONTAINERIZED(%ENV),
  127. $callbacks,
  128. );
  129. Promise.allof( $!exit_promise, @!promises ).then({
  130. $!exit_promise.status == Broken
  131. ?? $!exit_promise.cause.throw
  132. !! $!exit_promise.result
  133. })
  134. }
  135. method print(Proc::Async:D: Str() $str, :$scheduler = $*SCHEDULER) {
  136. X::Proc::Async::OpenForWriting.new(:method<print>, proc => self).throw if !$!w;
  137. X::Proc::Async::MustBeStarted.new(:method<print>, proc => self).throw if !$!started;
  138. self.write($str.encode($!enc, :$!translate-nl))
  139. }
  140. method put(Proc::Async:D: \x, |c) {
  141. X::Proc::Async::OpenForWriting.new(:method<say>, proc => self).throw if !$!w;
  142. X::Proc::Async::MustBeStarted.new(:method<say>, proc => self).throw if !$!started;
  143. self.print( x.join ~ "\n", |c );
  144. }
  145. method say(Proc::Async:D: \x, |c) {
  146. X::Proc::Async::OpenForWriting.new(:method<say>, proc => self).throw if !$!w;
  147. X::Proc::Async::MustBeStarted.new(:method<say>, proc => self).throw if !$!started;
  148. self.print( x.gist ~ "\n", |c );
  149. }
  150. method write(Proc::Async:D: Blob:D $b, :$scheduler = $*SCHEDULER) {
  151. X::Proc::Async::OpenForWriting.new(:method<write>, proc => self).throw if !$!w;
  152. X::Proc::Async::MustBeStarted.new(:method<write>, proc => self).throw if !$!started;
  153. my $p = Promise.new;
  154. my $v = $p.vow;
  155. nqp::asyncwritebytes(
  156. $!process_handle,
  157. $scheduler.queue,
  158. -> Mu \bytes, Mu \err {
  159. if err {
  160. $v.break(err);
  161. }
  162. else {
  163. $v.keep(bytes);
  164. }
  165. },
  166. nqp::decont($b), ProcessCancellation);
  167. $p
  168. }
  169. method close-stdin(Proc::Async:D:) {
  170. X::Proc::Async::OpenForWriting.new(:method<close-stdin>, proc => self).throw
  171. if !$!w;
  172. X::Proc::Async::MustBeStarted.new(:method<close-stdin>, proc => self).throw
  173. if !$!started;
  174. nqp::closefh($!process_handle);
  175. True;
  176. }
  177. method kill(Proc::Async:D: $signal = "HUP") {
  178. X::Proc::Async::MustBeStarted.new(:method<kill>, proc => self).throw if !$!started;
  179. nqp::killprocasync($!process_handle, $*KERNEL.signal($signal));
  180. }
  181. }