-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eio.Workpool #584
Eio.Workpool #584
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
type job = Pack : (unit -> 'a) * ('a, exn) Result.t Promise.u -> job | ||
|
||
type action = | ||
| Process of job | ||
| Quit of { | ||
atomic: int Atomic.t; | ||
target: int; | ||
all_done: unit Promise.u; | ||
} | ||
|
||
(* Worker: 1 domain/thread | ||
m jobs per worker, n domains per workpool *) | ||
|
||
type t = { | ||
sw: Switch.t; | ||
(* The work queue *) | ||
stream: action Stream.t; | ||
(* Number of domains. Depending on settings, domains may run more than 1 job at a time. *) | ||
domain_count: int; | ||
(* True when [Workpool.terminate] has been called. *) | ||
is_terminating: bool Atomic.t; | ||
(* Resolved when the workpool begins terminating. *) | ||
terminating: action Promise.t * action Promise.u; | ||
(* Resolved when the workpool has terminated. *) | ||
terminated: unit Promise.t * unit Promise.u; | ||
} | ||
|
||
let reject (Pack (_, w)) = Promise.resolve_error w (Failure "Workpool.terminate called") | ||
|
||
(* This function is the core of workpool.ml. | ||
Each worker recursively calls [loop ()] until the [terminating] | ||
promise is resolved. Workers pull one job at a time from the Stream. *) | ||
let start_worker ~limit ~terminating stream = | ||
Switch.run @@ fun sw -> | ||
let capacity = Semaphore.make limit in | ||
let run_job job w = | ||
Fiber.fork ~sw (fun () -> | ||
Promise.resolve w | ||
(try Ok (job ()) with | ||
| exn -> Error exn); | ||
Semaphore.release capacity ) | ||
in | ||
(* The main worker loop. *) | ||
let rec loop () = | ||
let actions = Fiber.n_any [ (fun () -> Promise.await terminating); (fun () -> Semaphore.acquire capacity; Stream.take stream) ] in | ||
match actions with | ||
| [ Process (Pack (job, w)) ] -> | ||
(* We start the job right away. This also gives a chance to other domains | ||
to start waiting on the Stream before the current thread blocks on [Stream.take] again. *) | ||
run_job job w; | ||
(loop [@tailcall]) () | ||
| Quit { atomic; target; all_done } :: maybe_job -> | ||
List.iter | ||
(function | ||
| Process job -> reject job | ||
| _ -> assert false) | ||
maybe_job; | ||
(* If we're the last worker terminating, resolve the promise | ||
after waiting until the completion of all of this worker's jobs. *) | ||
if Atomic.fetch_and_add atomic 1 = target | ||
then Switch.on_release sw (Promise.resolve all_done) | ||
| _ -> assert false | ||
in | ||
loop () | ||
|
||
(* Start a new domain. The worker will need a switch, then we start the worker. *) | ||
let start_domain ~sw ~domain_mgr ~limit ~terminating ~transient stream = | ||
let go () = | ||
Domain_manager.run domain_mgr (fun () -> start_worker ~limit ~terminating stream ) | ||
in | ||
(* [transient] workpools run as daemons to not hold the user's switch from completing. | ||
It's up to the user to hold the switch open (and thus, the workpool) | ||
by blocking on the jobs issued to the workpool. | ||
[Workpool.submit] and [Workpool.submit_exn] will block so this shouldn't be a problem. | ||
Still, the user can call [Workpool.create] with [~transient:false] to | ||
disable this behavior, in which case the user must call [Workpool.terminate] | ||
to release the switch. *) | ||
match transient with | ||
| false -> Fiber.fork ~sw go | ||
| true -> | ||
Fiber.fork_daemon ~sw (fun () -> | ||
go (); | ||
`Stop_daemon ) | ||
|
||
let create ~sw ~domain_count ~domain_concurrency ?(transient = true) domain_mgr = | ||
let stream = Stream.create 0 in | ||
let instance = | ||
{ | ||
sw; | ||
stream; | ||
domain_count; | ||
is_terminating = Atomic.make false; | ||
terminating = Promise.create (); | ||
terminated = Promise.create (); | ||
} | ||
in | ||
let terminating = fst instance.terminating in | ||
for _ = 1 to domain_count do | ||
start_domain ~sw ~domain_mgr ~limit:domain_concurrency ~terminating ~transient stream | ||
done; | ||
instance | ||
|
||
let submit_fork ~sw { stream; _ } f = | ||
let p, w = Promise.create () in | ||
Fiber.fork_promise ~sw (fun () -> | ||
Stream.add stream (Process (Pack (f, w))); | ||
Promise.await_exn p ) | ||
|
||
let submit { stream; _ } f = | ||
let p, w = Promise.create () in | ||
Stream.add stream (Process (Pack (f, w))); | ||
Promise.await p | ||
|
||
let submit_exn instance f = | ||
match submit instance f with | ||
| Ok x -> x | ||
| Error exn -> raise exn | ||
|
||
let terminate ({ terminating = _, w1; terminated = p2, w2; _ } as instance) = | ||
if Atomic.compare_and_set instance.is_terminating false true | ||
then ( | ||
(* Instruct workers to shutdown *) | ||
Promise.resolve w1 (Quit { atomic = Atomic.make 1; target = instance.domain_count; all_done = w2 }); | ||
(* Reject all present and future queued jobs *) | ||
Fiber.fork_daemon ~sw:instance.sw (fun () -> | ||
while true do | ||
match Stream.take instance.stream with | ||
| Process job -> reject job | ||
| _ -> assert false | ||
done; | ||
`Stop_daemon ); | ||
(* Wait for all workers to have shutdown *) | ||
Promise.await p2 ) | ||
else (* [Workpool.terminate] was called more than once. *) | ||
Promise.await p2 | ||
|
||
let is_terminating { is_terminating; _ } = Atomic.get is_terminating | ||
|
||
let is_terminated { terminated = p, _; _ } = Promise.is_resolved p | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this? The caller can just wait for their switch to finish. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's there to allow for uncommon use cases where |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
type t | ||
|
||
(** Creates a new workpool with [domain_count]. | ||
|
||
[domain_concurrency] is the maximum number of jobs that each domain can run at a time. | ||
|
||
[transient] (default: true). When true, the workpool will not block the [~sw] Switch from completing. | ||
When false, you must call [terminate] to release the [~sw] Switch. *) | ||
val create : | ||
sw:Switch.t -> | ||
domain_count:int -> | ||
domain_concurrency:int -> | ||
?transient:bool -> | ||
_ Domain_manager.t -> | ||
t | ||
|
||
(** Run a job on this workpool. It is placed at the end of the queue. *) | ||
val submit : t -> (unit -> 'a) -> ('a, exn) result | ||
|
||
(** Same as [submit] but raises if the job failed. *) | ||
val submit_exn : t -> (unit -> 'a) -> 'a | ||
|
||
(** Same as [submit] but returns immediately, without blocking. *) | ||
val submit_fork : sw:Switch.t -> t -> (unit -> 'a) -> ('a, exn) result Promise.t | ||
|
||
(** Waits for all running jobs to complete, then returns. | ||
No new jobs are started, even if they were already enqueued. | ||
To abort all running jobs instead of waiting for them, call [Switch.fail] on the Switch used to create this workpool *) | ||
val terminate : t -> unit | ||
|
||
(** Returns true if the [terminate] function has been called on this workpool. | ||
Also returns true if the workpool has fully terminated. *) | ||
val is_terminating : t -> bool | ||
|
||
(** Returns true if the [terminate] function has been called on this workpool AND | ||
the workpool has fully terminated (all running jobs have completed). *) | ||
val is_terminated : t -> bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit over-complicated. I was expecting it to write
instance.domain_count
quit messages to the stream. That would avoid the need for a second channel for the quit message and then_any
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason it's this way is due to a combination of factors:
terminate
should "immediately" halt starting new jobs.The obvious solution is to make
terminate
reject all queued jobs before enqueueing Quit messages, but that doesn't work well with the post-termination background rejection loop, becauseterminate
becomes both a producer and a consumer while the workers are still consumers. It can be made to work, but the end result was more complex and less predictable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workers can still check
is_terminating
before running a job and reject the job if it's set. I think that would have the same effect.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After trying it out, I'm remembering why it's not that way.
It's minor but I think it makes a difference.
By using a second channel, we're able to start the background rejection loop immediately. Otherwise, if all workers are fully occupied at the time
terminate
is called, we have to wait for a worker to be available to start rejecting jobs.An alternative I've also explored is to immediately (T0) reject all queued jobs before enqueueing
n
Quit messages (T1), but that leaves jobs enqueued between T0 and T1 to hang until the background rejection job starts (which can only happen after all workers have exited so their Quit messages don't get dropped). This inconsistent behavior can be patched over by checking theis_terminating
(bool Atomic.t
) when submitting a new job but I'm trying to avoid all unnecessary thread coordination in the hot path...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to do that anyway, otherwise this could happen:
is_terminating
and see it's still OK.It's not clear what behaviour we really want for terminate though. e.g. why do we want to reject jobs that were submitted before
terminate
was called? Then there's no way for a client to behave "correctly" (so that it's jobs never get rejected) in the graceful shutdown case.It would probably be helpful to have an example of a program that needs support for this kind of graceful shutdown (rather than just finishing the switch, which I suspect will cover most real uses). Or possibly we should make an initial version without
terminate
and add it later if/when it's needed.