1. my class IO::Socket::Async {
  2. my class SocketCancellation is repr('AsyncTask') { }
  3. has $!VMIO;
  4. has int $!udp;
  5. has $.enc;
  6. method new() {
  7. die "Cannot create an asynchronous socket directly; please use\n" ~
  8. "IO::Socket::Async.connect, IO::Socket::Async.listen,\n" ~
  9. "IO::Socket::Async.udp, or IO::Socket::Async.udp-bind";
  10. }
  11. method print(IO::Socket::Async:D: Str() $str, :$scheduler = $*SCHEDULER) {
  12. self.write($str.encode($!enc))
  13. }
  14. method write(IO::Socket::Async:D: Blob $b, :$scheduler = $*SCHEDULER) {
  15. my $p = Promise.new;
  16. my $v = $p.vow;
  17. nqp::asyncwritebytes(
  18. $!VMIO,
  19. $scheduler.queue,
  20. -> Mu \bytes, Mu \err {
  21. if err {
  22. $v.break(err);
  23. }
  24. else {
  25. $v.keep(bytes);
  26. }
  27. },
  28. nqp::decont($b), SocketCancellation);
  29. $p
  30. }
  31. my sub capture(\supply) {
  32. my $ss = Rakudo::Internals::SupplySequencer.new(
  33. on-data-ready => -> \data { supply.emit(data) },
  34. on-completed => -> { supply.done() },
  35. on-error => -> \err { supply.quit(err) });
  36. -> Mu \seq, Mu \data, Mu \err { $ss.process(seq, data, err) }
  37. }
  38. method Supply(IO::Socket::Async:D: :$bin, :$buf = buf8.new, :$enc, :$scheduler = $*SCHEDULER) {
  39. if $bin {
  40. my $cancellation;
  41. Supply.on-demand:
  42. -> $supply {
  43. $cancellation := nqp::asyncreadbytes($!VMIO, $scheduler.queue,
  44. capture($supply), nqp::decont($buf), SocketCancellation)
  45. },
  46. closing => {
  47. $cancellation && nqp::cancel($cancellation)
  48. }
  49. }
  50. else {
  51. my $bin-supply = self.Supply(:bin);
  52. if $!udp {
  53. supply {
  54. whenever $bin-supply {
  55. emit .decode($enc // $!enc);
  56. }
  57. }
  58. }
  59. else {
  60. Rakudo::Internals.BYTE_SUPPLY_DECODER($bin-supply, $enc // $!enc)
  61. }
  62. }
  63. }
  64. method close(IO::Socket::Async:D: --> True) {
  65. nqp::closefh($!VMIO);
  66. }
  67. method connect(IO::Socket::Async:U: Str() $host, Int() $port,
  68. :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  69. my $p = Promise.new;
  70. my $v = $p.vow;
  71. nqp::asyncconnect(
  72. $scheduler.queue,
  73. -> Mu \socket, Mu \err {
  74. if err {
  75. $v.break(err);
  76. }
  77. else {
  78. my $client_socket := nqp::create(self);
  79. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  80. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
  81. $v.keep($client_socket);
  82. }
  83. },
  84. $host, $port, SocketCancellation);
  85. $p
  86. }
  87. method listen(IO::Socket::Async:U: Str() $host, Int() $port, Int() $backlog = 128,
  88. :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  89. my $cancellation;
  90. Supply.on-demand(-> $s {
  91. $cancellation := nqp::asynclisten(
  92. $scheduler.queue,
  93. -> Mu \socket, Mu \err {
  94. if err {
  95. $s.quit(err);
  96. }
  97. else {
  98. my $client_socket := nqp::create(self);
  99. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  100. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
  101. $s.emit($client_socket);
  102. }
  103. },
  104. $host, $port, $backlog, SocketCancellation);
  105. },
  106. closing => {
  107. if $cancellation {
  108. my $p = Promise.new;
  109. my $v = $p.vow;
  110. nqp::cancelnotify($cancellation, $scheduler.queue, { $v.keep(True); });
  111. $p
  112. }
  113. });
  114. }
  115. method udp(IO::Socket::Async:U: :$broadcast, :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  116. my $p = Promise.new;
  117. nqp::asyncudp(
  118. $scheduler.queue,
  119. -> Mu \socket, Mu \err {
  120. if err {
  121. $p.break(err);
  122. }
  123. else {
  124. my $client_socket := nqp::create(self);
  125. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  126. nqp::bindattr_i($client_socket, IO::Socket::Async, '$!udp', 1);
  127. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
  128. $p.keep($client_socket);
  129. }
  130. },
  131. nqp::null_s(), 0, $broadcast ?? 1 !! 0,
  132. SocketCancellation);
  133. await $p
  134. }
  135. method bind-udp(IO::Socket::Async:U: Str() $host, Int() $port, :$broadcast,
  136. :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  137. my $p = Promise.new;
  138. nqp::asyncudp(
  139. $scheduler.queue,
  140. -> Mu \socket, Mu \err {
  141. if err {
  142. $p.break(err);
  143. }
  144. else {
  145. my $client_socket := nqp::create(self);
  146. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  147. nqp::bindattr_i($client_socket, IO::Socket::Async, '$!udp', 1);
  148. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
  149. $p.keep($client_socket);
  150. }
  151. },
  152. nqp::unbox_s($host), nqp::unbox_i($port), $broadcast ?? 1 !! 0,
  153. SocketCancellation);
  154. await $p
  155. }
  156. method print-to(IO::Socket::Async:D: Str() $host, Int() $port, Str() $str, :$scheduler = $*SCHEDULER) {
  157. self.write-to($host, $port, $str.encode($!enc))
  158. }
  159. method write-to(IO::Socket::Async:D: Str() $host, Int() $port, Blob $b, :$scheduler = $*SCHEDULER) {
  160. my $p = Promise.new;
  161. my $v = $p.vow;
  162. nqp::asyncwritebytesto(
  163. $!VMIO,
  164. $scheduler.queue,
  165. -> Mu \bytes, Mu \err {
  166. if err {
  167. $v.break(err);
  168. }
  169. else {
  170. $v.keep(bytes);
  171. }
  172. },
  173. nqp::decont($b), SocketCancellation,
  174. nqp::unbox_s($host), nqp::unbox_i($port));
  175. $p
  176. }
  177. }