1. my class Proc::Async { ... }
  2. my role X::Proc::Async is Exception {
  3. has Proc::Async $.proc;
  4. }
  5. my class X::Proc::Async::TapBeforeSpawn does X::Proc::Async {
  6. has $.handle;
  7. method message() {
  8. "To avoid data races, you must tap $!handle before running the process"
  9. }
  10. }
  11. my class X::Proc::Async::CharsOrBytes does X::Proc::Async {
  12. has $.handle;
  13. method message() {
  14. "Can only tap one of chars or bytes supply for $!handle"
  15. }
  16. }
  17. my class X::Proc::Async::AlreadyStarted does X::Proc::Async {
  18. method message() {
  19. "Process has already been started"
  20. }
  21. }
  22. my class X::Proc::Async::MustBeStarted does X::Proc::Async {
  23. has $.method;
  24. method message() {
  25. "Process must be started first before calling '$!method'"
  26. }
  27. }
  28. my class X::Proc::Async::OpenForWriting does X::Proc::Async {
  29. has $.method;
  30. method message() {
  31. "Process must be opened for writing with :w to call '$!method'"
  32. }
  33. }
  34. my class Proc::Async {
  35. my class ProcessCancellation is repr('AsyncTask') { }
  36. my enum CharsOrBytes ( :Bytes(0), :Chars(1) );
  37. has $!ready_promise = Promise.new;
  38. has $!ready_vow = $!ready_promise.vow;
  39. has $.path;
  40. has @.args;
  41. has $.w;
  42. has $.enc = 'utf8';
  43. has $.translate-nl = True;
  44. has Bool $.started = False;
  45. has $!stdout_supply;
  46. has CharsOrBytes $!stdout_type;
  47. has $!stderr_supply;
  48. has CharsOrBytes $!stderr_type;
  49. has $!process_handle;
  50. has $!exit_promise;
  51. has @!promises;
  52. proto method new(|) { * }
  53. multi method new($path, *@args, *%_) {
  54. self.bless(:$path, :@args, |%_)
  55. }
  56. method !supply(\what,\the-supply,\type,\value) {
  57. X::Proc::Async::TapBeforeSpawn.new(handle => what, proc => self).throw
  58. if $!started;
  59. X::Proc::Async::CharsOrBytes.new(handle => what, proc => self).throw
  60. if the-supply and type != value;
  61. type = value;
  62. the-supply //= Supplier::Preserving.new;
  63. }
  64. proto method stdout(|) { * }
  65. multi method stdout(Proc::Async:D: :$bin!) {
  66. $bin
  67. ?? self!supply('stdout', $!stdout_supply, $!stdout_type, Bytes).Supply
  68. !! self.stdout(|%_)
  69. }
  70. multi method stdout(Proc::Async:D: :$enc, :$translate-nl) {
  71. self!wrap-decoder:
  72. self!supply('stdout', $!stdout_supply, $!stdout_type, Chars).Supply,
  73. $enc, :$translate-nl
  74. }
  75. proto method stderr(|) { * }
  76. multi method stderr(Proc::Async:D: :$bin!) {
  77. $bin
  78. ?? self!supply('stderr', $!stderr_supply, $!stderr_type, Bytes).Supply
  79. !! self.stderr(|%_)
  80. }
  81. multi method stderr(Proc::Async:D: :$enc, :$translate-nl) {
  82. self!wrap-decoder:
  83. self!supply('stderr', $!stderr_supply, $!stderr_type, Chars).Supply,
  84. $enc, :$translate-nl
  85. }
  86. method ready(--> Promise) {
  87. $!ready_promise;
  88. }
  89. method !wrap-decoder(Supply:D $bin-supply, $enc, :$translate-nl) {
  90. Rakudo::Internals.BYTE_SUPPLY_DECODER($bin-supply, $enc // $!enc,
  91. :translate-nl($translate-nl // $!translate-nl))
  92. }
  93. method !capture(\callbacks,\std,\the-supply) {
  94. my $promise = Promise.new;
  95. my $vow = $promise.vow;
  96. my $ss = Rakudo::Internals::SupplySequencer.new(
  97. on-data-ready => -> \data { the-supply.emit(data) },
  98. on-completed => -> { the-supply.done(); $vow.keep(the-supply) },
  99. on-error => -> \err { the-supply.quit(err); $vow.keep((the-supply,err)) });
  100. nqp::bindkey(callbacks,
  101. std ~ '_bytes' ,
  102. -> Mu \seq, Mu \data, Mu \err { $ss.process(seq, data, err) });
  103. $promise;
  104. }
  105. method start(Proc::Async:D: :$scheduler = $*SCHEDULER, :$ENV, :$cwd = $*CWD) {
  106. X::Proc::Async::AlreadyStarted.new(proc => self).throw if $!started;
  107. $!started = True;
  108. my %ENV := $ENV ?? $ENV.hash !! %*ENV;
  109. $!exit_promise = Promise.new;
  110. my Mu $callbacks := nqp::hash();
  111. nqp::bindkey($callbacks, 'done', -> Mu \status {
  112. $!exit_promise.keep(Proc.new(
  113. :exitcode(status +> 8), :signal(status +& 0xFF),
  114. :command[ $!path, |@!args ],
  115. ))
  116. });
  117. nqp::bindkey($callbacks, 'ready', {
  118. $!ready_vow.keep(Nil);
  119. });
  120. nqp::bindkey($callbacks, 'error', -> Mu \err {
  121. my $error = X::OS.new(os-error => err);
  122. $!exit_promise.break($error);
  123. $!ready_vow.break($error);
  124. });
  125. @!promises.push(
  126. self!capture($callbacks,'stdout',$!stdout_supply)
  127. ) if $!stdout_supply;
  128. @!promises.push(
  129. self!capture($callbacks,'stderr',$!stderr_supply)
  130. ) if $!stderr_supply;
  131. nqp::bindkey($callbacks, 'buf_type', buf8.new);
  132. nqp::bindkey($callbacks, 'write', True) if $.w;
  133. $!process_handle := nqp::spawnprocasync($scheduler.queue,
  134. CLONE-LIST-DECONTAINERIZED($!path,@!args),
  135. $cwd.Str,
  136. CLONE-HASH-DECONTAINERIZED(%ENV),
  137. $callbacks,
  138. );
  139. Promise.allof( $!exit_promise, @!promises ).then({
  140. $!exit_promise.status == Broken
  141. ?? $!exit_promise.cause.throw
  142. !! $!exit_promise.result
  143. })
  144. }
  145. method print(Proc::Async:D: Str() $str, :$scheduler = $*SCHEDULER) {
  146. X::Proc::Async::OpenForWriting.new(:method<print>, proc => self).throw if !$!w;
  147. X::Proc::Async::MustBeStarted.new(:method<print>, proc => self).throw if !$!started;
  148. self.write($str.encode($!enc, :$!translate-nl))
  149. }
  150. method put(Proc::Async:D: \x, |c) {
  151. X::Proc::Async::OpenForWriting.new(:method<say>, proc => self).throw if !$!w;
  152. X::Proc::Async::MustBeStarted.new(:method<say>, proc => self).throw if !$!started;
  153. self.print( x.join ~ "\n", |c );
  154. }
  155. method say(Proc::Async:D: \x, |c) {
  156. X::Proc::Async::OpenForWriting.new(:method<say>, proc => self).throw if !$!w;
  157. X::Proc::Async::MustBeStarted.new(:method<say>, proc => self).throw if !$!started;
  158. self.print( x.gist ~ "\n", |c );
  159. }
  160. method write(Proc::Async:D: Blob:D $b, :$scheduler = $*SCHEDULER) {
  161. X::Proc::Async::OpenForWriting.new(:method<write>, proc => self).throw if !$!w;
  162. X::Proc::Async::MustBeStarted.new(:method<write>, proc => self).throw if !$!started;
  163. my $p = Promise.new;
  164. my $v = $p.vow;
  165. nqp::asyncwritebytes(
  166. $!process_handle,
  167. $scheduler.queue,
  168. -> Mu \bytes, Mu \err {
  169. if err {
  170. $v.break(err);
  171. }
  172. else {
  173. $v.keep(bytes);
  174. }
  175. },
  176. nqp::decont($b), ProcessCancellation);
  177. $p
  178. }
  179. method close-stdin(Proc::Async:D:) {
  180. X::Proc::Async::OpenForWriting.new(:method<close-stdin>, proc => self).throw
  181. if !$!w;
  182. X::Proc::Async::MustBeStarted.new(:method<close-stdin>, proc => self).throw
  183. if !$!started;
  184. nqp::closefh($!process_handle);
  185. True;
  186. }
  187. method kill(Proc::Async:D: $signal = "HUP") {
  188. X::Proc::Async::MustBeStarted.new(:method<kill>, proc => self).throw if !$!started;
  189. nqp::killprocasync($!process_handle, $*KERNEL.signal($signal));
  190. }
  191. }