1. # The ThreadPoolScheduler is a straightforward scheduler that maintains a
  2. # pool of threads and schedules work items in the order they are added
  3. # using them.
  4. my class ThreadPoolScheduler does Scheduler {
  5. constant THREAD_POOL_PROMPT = Mu.new;
  6. class ThreadPoolAwaiter does Awaiter {
  7. has $!queue;
  8. submethod BUILD(:$queue!) {
  9. $!queue := nqp::decont($queue);
  10. }
  11. method await(Awaitable:D $a) {
  12. my $handle := $a.get-await-handle;
  13. if $handle.already {
  14. $handle.success
  15. ?? $handle.result
  16. !! $handle.cause.rethrow
  17. }
  18. else {
  19. my $success;
  20. my $result;
  21. nqp::continuationcontrol(0, THREAD_POOL_PROMPT, -> Mu \c {
  22. $handle.subscribe-awaiter(-> \success, \result {
  23. $success := success;
  24. $result := result;
  25. nqp::push($!queue, { nqp::continuationinvoke(c, nqp::null()) });
  26. Nil
  27. });
  28. });
  29. $success
  30. ?? $result
  31. !! $result.rethrow
  32. }
  33. }
  34. method await-all(Iterable:D \i) {
  35. # Collect results that are already available, and handles where the
  36. # results are not yet available together with the matching insertion
  37. # indices.
  38. my \results = nqp::list();
  39. my \handles = nqp::list();
  40. my \indices = nqp::list_i();
  41. my int $insert = 0;
  42. for i -> $awaitable {
  43. unless nqp::istype($awaitable, Awaitable) {
  44. die "Can only specify Awaitable objects to await (got a $awaitable.^name())";
  45. }
  46. unless nqp::isconcrete($awaitable) {
  47. die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())";
  48. }
  49. my $handle := $awaitable.get-await-handle;
  50. if $handle.already {
  51. $handle.success
  52. ?? nqp::bindpos(results, $insert, $handle.result)
  53. !! $handle.cause.rethrow
  54. }
  55. else {
  56. nqp::push(handles, $handle);
  57. nqp::push_i(indices, $insert);
  58. }
  59. $insert++;
  60. }
  61. # See if we have anything that we really need to suspend for. If
  62. # so, we need to take great care that the continuation taking is
  63. # complete before we try to resume it (completions can happen on
  64. # different threads, and so concurrent with us subscribing, not
  65. # to mention concurrent with each other wanting to resume). We
  66. # use a lock to take care of this, holding the lock until the
  67. # continuation has been taken.
  68. my int $num-handles = nqp::elems(handles);
  69. if $num-handles {
  70. my $continuation;
  71. my $exception;
  72. my $l = Lock.new;
  73. $l.lock;
  74. {
  75. my int $remaining = $num-handles;
  76. loop (my int $i = 0; $i < $num-handles; $i++) {
  77. my $handle := nqp::atpos(handles, $i);
  78. my int $insert = nqp::atpos_i(indices, $i);
  79. $handle.subscribe-awaiter(-> \success, \result {
  80. my int $resume;
  81. $l.protect: {
  82. if success && $remaining {
  83. nqp::bindpos(results, $insert, result);
  84. --$remaining;
  85. $resume = 1 unless $remaining;
  86. }
  87. elsif !nqp::isconcrete($exception) {
  88. $exception := result;
  89. $remaining = 0;
  90. $resume = 1;
  91. }
  92. }
  93. if $resume {
  94. nqp::push($!queue, {
  95. nqp::continuationinvoke($continuation, nqp::null())
  96. });
  97. }
  98. });
  99. }
  100. CATCH {
  101. # Unlock if we fail here, and let the exception
  102. # propagate outwards.
  103. $l.unlock();
  104. }
  105. }
  106. nqp::continuationcontrol(0, THREAD_POOL_PROMPT, -> Mu \c {
  107. $continuation := c;
  108. $l.unlock;
  109. });
  110. # If we got an exception, throw it.
  111. $exception.rethrow if nqp::isconcrete($exception);
  112. }
  113. nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results);
  114. }
  115. }
  116. # A concurrent work queue that blocks any worker threads that poll it
  117. # when empty until some work arrives.
  118. my class Queue is repr('ConcBlockingQueue') { }
  119. has $!queue;
  120. # Semaphore to ensure we don't start more than the maximum number of
  121. # threads allowed.
  122. has $!thread_start_semaphore;
  123. # Number of outstanding work items, used for rough management of the
  124. # pool size.
  125. has int $!loads;
  126. # Number of threads started so far.
  127. has int $!threads_started;
  128. # Lock protecting updates to the above 2 fields.
  129. has $!counts_lock;
  130. # If we've got incoming I/O events we need a thread to handle.
  131. has int $!need_io_thread;
  132. # Initial and maximum threads.
  133. has Int $.initial_threads;
  134. has Int $.max_threads;
  135. # Have we started any threads yet?
  136. has int $!started_any;
  137. # Adds a new thread to the pool, respecting the maximum.
  138. method !maybe_new_thread() {
  139. if $!thread_start_semaphore.try_acquire() {
  140. $!started_any = 1;
  141. $!counts_lock.protect: { $!threads_started = $!threads_started + 1 };
  142. Thread.start(:app_lifetime, {
  143. my $*AWAITER := ThreadPoolAwaiter.new(:$!queue);
  144. loop {
  145. my Mu $task := nqp::shift($!queue);
  146. $!counts_lock.protect: { $!loads = $!loads + 1 };
  147. nqp::continuationreset(THREAD_POOL_PROMPT, {
  148. if nqp::islist($task) {
  149. my Mu $code := nqp::shift($task);
  150. my \args = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', $task);
  151. $code(|args);
  152. }
  153. else {
  154. $task();
  155. }
  156. CONTROL {
  157. default {
  158. my Mu $vm-ex := nqp::getattr(nqp::decont($_), Exception, '$!ex');
  159. nqp::getcomp('perl6').handle-control($vm-ex);
  160. }
  161. }
  162. CATCH {
  163. default {
  164. self.handle_uncaught($_)
  165. }
  166. }
  167. });
  168. $!counts_lock.protect: { $!loads = $!loads - 1 };
  169. }
  170. });
  171. }
  172. }
  173. submethod BUILD(
  174. Int :$!initial_threads = 0,
  175. Int :$!max_threads = (%*ENV<RAKUDO_MAX_THREADS> // 16).Int
  176. --> Nil
  177. ) {
  178. die "Initial thread pool threads ($!initial_threads) must be less than or equal to maximum threads ($!max_threads)"
  179. if $!initial_threads > $!max_threads;
  180. }
  181. method queue() {
  182. self!initialize unless $!started_any;
  183. self!maybe_new_thread();
  184. $!need_io_thread = 1;
  185. $!queue
  186. }
  187. method cue(&code, :$at, :$in, :$every, :$times = 1, :&stop is copy, :&catch ) {
  188. my class TimerCancellation is repr('AsyncTask') { }
  189. die "Cannot specify :at and :in at the same time"
  190. if $at.defined and $in.defined;
  191. die "Cannot specify :every, :times and :stop at the same time"
  192. if $every.defined and $times > 1 and &stop;
  193. my $delay = $at ?? $at - now !! $in // 0;
  194. self!initialize unless $!started_any;
  195. # need repeating
  196. if $every {
  197. # generate a stopper if needed
  198. if $times > 1 {
  199. my $todo = $times;
  200. &stop = sub { $todo ?? !$todo-- !! True }
  201. }
  202. # we have a stopper
  203. if &stop {
  204. my $handle;
  205. my $cancellation;
  206. sub cancellation() {
  207. $cancellation //=
  208. Cancellation.new(async_handles => [$handle]);
  209. }
  210. $handle := nqp::timer($!queue,
  211. &catch
  212. ?? -> {
  213. stop()
  214. ?? cancellation().cancel
  215. !! code();
  216. CATCH { default { catch($_) } };
  217. }
  218. !! -> {
  219. stop()
  220. ?? cancellation().cancel
  221. !! code();
  222. },
  223. to-millis($delay), to-millis($every),
  224. TimerCancellation);
  225. self!maybe_new_thread();
  226. return cancellation()
  227. }
  228. # no stopper
  229. else {
  230. my $handle := nqp::timer($!queue,
  231. &catch
  232. ?? -> { code(); CATCH { default { catch($_) } } }
  233. !! &code,
  234. to-millis($delay), to-millis($every),
  235. TimerCancellation);
  236. self!maybe_new_thread();
  237. return Cancellation.new(async_handles => [$handle]);
  238. }
  239. }
  240. # only after waiting a bit or more than once
  241. elsif $delay or $times > 1 {
  242. my $todo := &catch
  243. ?? -> { code(); CATCH { default { catch($_) } } }
  244. !! &code;
  245. my @async_handles;
  246. $delay = to-millis($delay) if $delay;
  247. @async_handles.push(
  248. nqp::timer($!queue, $todo, $delay, 0, TimerCancellation)
  249. ) for 1 .. $times;
  250. self!maybe_new_thread();
  251. return Cancellation.new(:@async_handles);
  252. }
  253. # just cue the code
  254. else {
  255. my &run := &catch
  256. ?? -> { code(); CATCH { default { catch($_) } } }
  257. !! &code;
  258. self!maybe_new_thread() if $!loads + $!need_io_thread <= $!threads_started;
  259. nqp::push($!queue, &run);
  260. return Nil;
  261. }
  262. }
  263. method loads() {
  264. return 0 unless $!started_any;
  265. $!loads
  266. }
  267. multi to-millis(Int $value) {
  268. 1000 * $value
  269. }
  270. multi to-millis(Numeric $value) {
  271. my $proposed = (1000 * $value).Int;
  272. if $value && $proposed == 0 {
  273. warn "Minimum timer resolution is 1ms; using that instead of {1000 * $value}ms";
  274. $proposed = 1;
  275. }
  276. $proposed
  277. }
  278. multi to-millis($value) {
  279. to-millis(+$value)
  280. }
  281. method !initialize(--> Nil) {
  282. $!queue := nqp::create(Queue);
  283. $!thread_start_semaphore := Semaphore.new($!max_threads.Int);
  284. $!counts_lock := nqp::create(Lock);
  285. self!maybe_new_thread() for 1..$!initial_threads;
  286. }
  287. }
  288. # This thread pool scheduler will be the default one.
  289. Rakudo::Internals.REGISTER-DYNAMIC: '$*SCHEDULER', {
  290. PROCESS::<$SCHEDULER> = ThreadPoolScheduler.new();
  291. }