-
Notifications
You must be signed in to change notification settings - Fork 6
thread pool
This (a-sync thread-pool) library file supplements the (a-sync event-loop) library file. (a-sync event-loop) provides asynchronous procedures which can wait on the completion of a task running in a worker thread or on an event loop running in another thread. However, sometimes it is better to run tasks in a thread pool rather than launching worker threads, particularly where the workload suits having the number of threads in the thread pool approximating to the number of local processors available to the program.
This (a-sync thread-pool) library file provides such a thread pool, together with two asynchronous procedures (await-task-in-thread-pool! and await-generator-in-thread-pool!) which can wait in an event loop for a task on the thread pool to complete and provide its result.
The thread pool objects provided by this library file do not make provision for rate limiting similar to that provided by the event loops in (a-sync event-loop). This is because there is no one-size-fits-all way of doing so. One common approach is, as in the case of the event loops provided by this library, to apply throttling to threads which add tasks by enforcing a wait in their thread of execution when the level of queued tasks reaches too high a level, so hindering their ability to add new ones. However this is counter productive where it is a task running on the thread pool which is adding the new tasks, particularly with a thread pool having only a few threads running in its pool. Another approach is to throw an exception when adding tasks which exceed a user-selectable level.
The best approach is for user code to provide its own rate limiting in cases where the way that that code is organised means that it could produce an excessive number of accumulating unexecuted tasks in the thread pool, possibly by applying delays when unexecuted tasks rise in number excessively, using timeouts with an event loop. This may be checked for by having code call the thread-pool-get-num-tasks procedure before adding a significant batch of new tasks in order to test queue size, and if necessary postpone adding the new tasks until the size of the already accumulated tasks has reduced.
This library file provides the following procedures:
(make-thread-pool size [non-blocking])
This procedure constructs a thread pool object of native OS threads. The 'size' argument specifies the number of threads which will run in the pool, and must be greater than 0. The 'non-blocking' argument is optional and affects the operation of the thread-pool-stop! procedure. When set to #f, which is the default, that procedure will not return until all tasks previously added to the pool have completed. If set to #t, the thread-pool-stop! procedure will return immediately, before all tasks have finished.
The 'size' and 'non-blocking' settings may subsequently be altered by applying the thread-pool-change-size! or thread-pool-set-non-blocking! procedure to the pool.
Thread pool objects are usually best kept as top level objects, because threads in the pool will keep alive until thread-pool-stop! is called. If a thread pool is constructed within a local lexical scope, then either thread-pool-stop! must be applied to the pool before that scope is exited, or the last task added to the pool should itself apply thread-pool-stop! to the pool (which it can do if 'non-blocking' is #t). Otherwise, the threads in the pool will remain alive uselessly in blocked condition until the program terminates, even though the pool may be inaccessible.
This procedure will throw an exception if a 'size' argument of less than 1 is given, or if the system is unable to start the number of threads given as the 'size' argument. If unable to start the number of threads so given, any threads which have in fact started in the pool will be killed.
This procedure is first available in version 0.16 of this library.
(thread-pool? obj)
This procedure indicates whether 'obj' is a thread pool object constructed by make-thread-pool.
This procedure is first available in version 0.16 of this library.
(thread-pool-get-num-tasks pool)
This procedure returns the number of tasks which the thread pool object is at present either running in the pool or has queued for execution. This procedure will not throw. It is also thread safe, although it accesses the task number field outside the pool mutex and therefore with relaxed memory ordering. That enables this procedure to be applied more efficiently for rate limiting purposes but the result might at any one time be marginally out of date.
This procedure is first available in version 0.16 of this library.
(thread-pool-get-size pool)
This procedure returns the current size setting for the thread pool (namely the number of threads that the pool runs).
This procedure is thread safe (any thread may call it).
This procedure is first available in version 0.16 of this library.
(thread-pool-change-size! pool delta)
This procedure will increase, or if 'delta' is negative reduce, the number of threads which the thread pool object will run by the value of 'delta'. This procedure does nothing if thread-pool-stop! has previously been called. This procedure is thread safe - any thread may call it.
One use for dynamic sizing of this kind is for a task to increment the thread number where it is about to enter a call which may block (non-asynchronously) for some time, with a view to decrementing it later when it has finished making blocking calls, so as to enable another thread to keep a core active. A with-thread-pool-increment macro is available which will do this for you automatically in an exception-safe way (see the documentation on that macro below).
Alternatively, this procedure can be used to reduce thread usage when a full set of threads is no longer required by the program.
If 'delta' is positive, this procedure may raise an exception if the system is unable to start the required new threads. Because starting new threads can be time consuming, to minimize contention new threads are started outside the pool's mutex, although internal book-keeping is done within the mutex. One consequence is that if such an exception is raised while another thread has concurrently tried to reduce the size of the pool, the size of the pool may be smaller than it was when this procedure was called. In certain circumstances, after an exception where no new threads can be started, the pool could have no running threads in it (so that thread-pool-get-size returns 0) even though some tasks previously added to the pool remain pending. If the system can start no new threads even though none are running in the pool, it will be significantly broken so it is not usually worth troubling about this - the program is doomed in that event whatever. However if that is wrong and the cause of the failure to start any threads can be addressed, then the thread pool can be brought back into use by calling this procedure again.
This procedure is first available in version 0.16 of this library.
(thread-pool-get-non-blocking pool)
This procedure returns the current non-blocking status of the thread pool. (See the documentation on the thread-pool-stop! procedure for more information about what that means.)
This procedure is thread safe (any thread may call it).
This procedure is first available in version 0.16 of this library.
(thread-pool-set-non-blocking! pool val)
This procedure sets the non-blocking status of the thread pool. If 'val' is #f, the thread-pool-stop procedure will block, if #t it will not. (See the documentation on the thread-pool-stop! procedure for more information about this.)
This procedure is thread safe (any thread may call it).
This procedure will raise a &violation exception if it is invoked after the thread pool object concerned has been closed by a call to thread-pool-stop!.
This procedure is first available in version 0.16 of this library.
(thread-pool-stop! pool)
This procedure will cause the thread-pool object to stop running tasks. However, all tasks already running or queued for execution will be permitted to execute and complete normally. If the thread-pool's non-blocking setting is set to #f, this procedure will wait until all the tasks still to execute have finished before returning, and if #t it will return straight away.
After this procedure has been called, any attempt to add further tasks with the thread-pool-add! procedure will fail, and that procedure will raise a &violation exception. The same exception will be raised if this procedure is applied to a thread pool to which this procedure has previously been applied.
This procedure is thread safe (any thread may call it) unless the non-blocking setting is #f, in which case no task running on the thread-pool object may call this procedure.
This procedure is first available in version 0.16 of this library.
(thread-pool-add! pool task [fail-handler])
This procedure adds a new task to the thread pool. 'task' must be a thunk. If one or more threads in the pool are currently blocking and waiting for a task, then the task will begin executing immediately in one of the threads. If not, the task will be queued for execution as soon as a thread becomes available. Tasks will be executed in the order in which they are added to the thread pool object. This procedure is thread safe (any thread may call it, including any task running on the thread pool object).
An optional handler procedure may be passed to 'fail-handler' which will be invoked if the task raises an exception. If a task raises an exception and no handler procedure is provided, the program will terminate. The 'fail-handler' procedure will be passed the condition object for the exception raised.
This procedure will raise a &violation exception if it is invoked after the thread pool object concerned has been closed by a call to thread-pool-stop!.
This procedure is first available in version 0.16 of this library.
(with-thread-pool-increment pool body0 body1 ...)
This macro is intended to be called by a task running on a thread pool which is about to make a blocking (non-asynchronous) call. It will increment the number of threads in 'pool' by 1 (by calling thread-pool-change-size!) so as to enable a queued task to keep a core active, and decrement it again when execution of the body clauses has completed.
The (i) increment, (ii) execution of body clauses, and (iii) decrement, form the three branches of a dynamic-wind, so the decrement automatically occurs if control leaves body execution because of an exception or other jump.
As this macro starts a new thread, it may raise an exception if the system is unable to start it. Starting a new thread is also not free of overhead and it may be worth profiling a prospective use case to see if running a few more threads in the thread pool than the number of processors in the system permanently, rather than changing it using this macro, and letting the OS's scheduler handle the situation, is a better option.
This macro is first available in version 0.16 of this library.
(await-task-in-thread-pool! await resume [loop] pool thunk [handler])
The 'loop' argument is optional. This procedure will run 'thunk' in the thread pool specified by the 'pool' argument. The result of executing 'thunk' will then be posted to the event loop specified by the 'loop' argument, or to the default event loop if no 'loop' argument is provided or if #f is provided as the 'loop' argument (pattern matching is used to detect the type of the third argument), and will comprise this procedure's return value. This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments). It will normally be necessary to call event-loop-block! on 'loop' (or on the default event loop) before invoking this procedure.
If the optional 'handler' argument is provided, then that handler will run if 'thunk' raises an exception, and the return value of the handler would become the return value of this procedure; otherwise the program will terminate if an unhandled exception propagates out of 'thunk'. Note that unlike a handler passed to the thread-pool-add! procedure, 'handler' will run in the event loop thread and not in a thread pool thread. Exceptions raised by the handler procedure will propagate out of event-loop-run! for the 'loop' event loop.
This procedure calls 'await' and must (like the a-sync procedure) be called in the same thread as that in which the 'loop' or default event loop runs (as the case may be).
This procedure calls event-post! in the 'loop' event loop, which could be subject to throttling (see the documentation for the make-event-loop procedure for further information).
Exceptions may propagate out of this procedure if they arise while setting up, which shouldn't happen unless the thread pool given by the 'pool' argument has been closed (in which case a &violation exception will be raised), the thread pool tries to start an additional native thread which the operating system fails to supply (which would cause a system exception to arise) or memory is exhausted.
Here is an example of the use of await-task-in-thread-pool!:
(set-default-event-loop!) ;; if none has yet been set
(let ([pool (make-thread-pool 4)])
(a-sync (lambda (await resume)
(format #t "1 + 1 is ~a\n"
(await-task-in-thread-pool! await resume
pool
(lambda ()
(+ 1 1))))
(event-loop-quit!))))
(event-loop-block! #t) ;; because the task runs in another thread
(event-loop-run!)
This procedure is first available in version 0.16 of this library.
(await-generator-in-thread-pool! await resume [loop] pool generator proc [handler])
The loop argument is optional. The 'generator' argument is a procedure taking one argument, namely a yield argument (see the documentation on the make-iterator procedure for further details). This await-generator-in-pool procedure will cause 'generator' to run as a task in the 'pool' thread pool, and whenever 'generator' yields a value this will cause 'proc' to execute in the event loop specified by the 'loop' argument, or in the default event loop if no 'loop' argument is provided or if #f is provided as the 'loop' argument. 'proc' should be a procedure taking a single argument, namely the value yielded by the generator.
This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments). It will normally be necessary to call event-loop-block! on 'loop' (or on the default event loop) before invoking this procedure.
If the optional 'handler' argument is provided, then that handler will run if 'generator' raises an exception; otherwise the program will terminate if an unhandled exception propagates out of 'generator'. Note that unlike a handler passed to the thread-pool-add! procedure, 'handler' will run in the event loop thread and not in a thread pool thread. This procedure will return #f if the generator completes normally, or 'chez-a-sync-thread-error if the generator throws an exception and 'handler' is run (the 'chez-a-sync-thread-error symbol is reserved to the implementation and should not be yielded by the generator). Exceptions thrown by the handler procedure will propagate out of event-loop-run! for the 'loop' event loop.
This procedure calls 'await' and will return when the generator has finished or, if 'handler' is provided, upon the generator raising an exception. This procedure must (like the a-sync procedure) be called in the same thread as that in which the 'loop' or default event loop runs (as the case may be).
This procedure calls event-post! in both the 'loop' event loop, which could be subject to throttling (see the documentation for the make-event-loop procedure for further information).
Exceptions may propagate out of this procedure if they arise while setting up, which shouldn't happen unless the thread loop given by the 'pool' argument has been closed (in which case an &violation exception will be raised) or memory is exhausted. Exceptions arising during the execution of 'proc', if not caught locally, will propagate out of event-loop-run! for 'loop' or the default event loop (as the case may be).
Here is an example of the use of await-generator-in-thread-pool!:
(set-default-event-loop!) ;; if none has yet been set
(let ([pool (make-thread-pool 4)])
(a-sync (lambda (await resume)
(await-generator-in-thread-pool! await resume
pool
(lambda (yield)
(let loop ([count 0])
(when (< count 5)
(yield (* 2 count))
(loop (+ count 1)))))
(lambda (val)
(display val)
(newline)))
(event-loop-block! #f))))
(event-loop-block! #t) ;; because the generator runs in another thread
(event-loop-run!)
This procedure is first available in version 0.16 of this library.
Library files:
- (a-sync coroutines)
- (a-sync event-loop)
- (a-sync thread-pool)
- (a-sync compose)
- (a-sync meeting)
- (a-sync try)
Other: