Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEST: benchmark thread pooling #679

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dscheck:
stress:
dune exec -- ./stress/stress_proc.exe
dune exec -- ./stress/stress_semaphore.exe
dune exec -- ./stress/stress_systhreads.exe

docker:
docker build -t eio .
2 changes: 1 addition & 1 deletion lib_eio/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
(foreign_stubs
(language c)
(include_dirs include)
(names fork_action stubs))
(names fork_action stubs thread_pool))
(libraries eio unix threads mtime.clock.os))

(rule
Expand Down
5 changes: 3 additions & 2 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ val sleep : float -> unit
It can also be used in programs that don't care about tracking determinism. *)

val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}).
This allows blocking calls to be made non-blocking.
(** [run_in_systhread fn] runs the function [fn] in using Eio's pool of system threads ({! Thread.t}).
This pool creates a new system thread if all threads are busy, it does not wait.
[run_in_systhread] allows blocking calls to be made non-blocking.

@param label The operation name to use in trace output. *)

Expand Down
3 changes: 1 addition & 2 deletions lib_eio/unix/private.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ module Fork_action = Fork_action

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Suspend.enter label @@ fun _ctx enqueue ->
let _t : Thread.t = Thread.create (fun () -> enqueue (try Ok (fn ()) with exn -> Error exn)) () in
()
Thread_pool.run_on_systhread ~enqueue fn
112 changes: 112 additions & 0 deletions lib_eio/unix/thread_pool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* This file is mostly taken from OCaml PR#12867.
* It exposes pthread_cond_timedwait(3) for timed_semaphore.ml.
* This file (and timed_semaphore.ml) can both be deleted once
* this feature is available in OCaml itself. */

#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <math.h>

#include <caml/memory.h>
#include <caml/alloc.h>
#include <caml/fail.h>

#define CAML_INTERNALS
#include <caml/runtime_events.h> // for CAML_EV_BEGIN and CAML_EV_END
#include <caml/sys.h> // for caml_strerror
#if defined(_WIN32)
#include <caml/winsupport.h> // for CAML_ULONGLONG_FILETIME
#endif
#undef CAML_INTERNALS


#if defined(_WIN32)
/* There are 11644473600 seconds between 1 January 1601 (the NT Epoch) and 1
* January 1970 (the Unix Epoch). FILETIME is measured in 100ns ticks.
*/
#define CAML_NT_EPOCH_100ns_TICKS 116444736000000000ULL
#else
#include <sys/time.h>
#endif

typedef pthread_cond_t* sync_condvar;
#define Condition_val(v) (* (sync_condvar *) Data_custom_val(v))
typedef pthread_mutex_t * sync_mutex;
#define Mutex_val(v) (* ((sync_mutex *) Data_custom_val(v)))

static void sync_check_error(int retcode, char * msg)
{
char * err;
char buf[1024];
int errlen, msglen;
value str;

if (retcode == 0) return;
if (retcode == ENOMEM) caml_raise_out_of_memory();
err = caml_strerror(retcode, buf, sizeof(buf));
msglen = strlen(msg);
errlen = strlen(err);
str = caml_alloc_string(msglen + 2 + errlen);
memcpy (&Byte(str, 0), msg, msglen);
memcpy (&Byte(str, msglen), ": ", 2);
memcpy (&Byte(str, msglen + 2), err, errlen);
caml_raise_sys_error(str);
}

#if defined(_WIN32)
static inline void sync_populate_timespec(double timeout_sec, struct timespec * ts)
{
double integr, frac, until;
CAML_ULONGLONG_FILETIME utime;

GetSystemTimeAsFileTime(&utime.ft);
until = (utime.ul - CAML_NT_EPOCH_100ns_TICKS) * 1e-7 + timeout_sec;
frac = modf(until, &integr);
ts->tv_sec = (time_t)integr;
ts->tv_nsec = ceil(1e9 * frac);
}
#else
static inline void sync_populate_timespec(double timeout_sec, struct timespec * ts)
{
double integr, frac;
frac = modf(timeout_sec, &integr);

struct timeval tv;
gettimeofday(&tv, 0);
ts->tv_sec = tv.tv_sec + (time_t)integr;
ts->tv_nsec =
(uint64_t)tv.tv_usec * (uint64_t)1000 +
(uint64_t)ceil(1e9 * frac);

if (ts->tv_nsec >= 1e9) {
ts->tv_sec++;
ts->tv_nsec -= 1e9;
}
}
#endif

value eio_unix_condition_timedwait(value v_cond, value v_mut,
value v_timeout_sec)
{
CAMLparam3(v_cond, v_mut, v_timeout_sec);
sync_condvar cond = Condition_val(v_cond);
sync_mutex mut = Mutex_val(v_mut);
double timeout_sec = Double_val(v_timeout_sec);
int retcode;
struct timespec ts;

sync_populate_timespec(timeout_sec, &ts);

CAML_EV_BEGIN(EV_DOMAIN_CONDITION_WAIT);
caml_enter_blocking_section();
retcode = pthread_cond_timedwait(cond, mut, &ts);
caml_leave_blocking_section();
if (retcode == ETIMEDOUT) {
CAMLreturn(Val_false);
}
sync_check_error(retcode, "Condition.timed_wait");
CAML_EV_END(EV_DOMAIN_CONDITION_WAIT);

CAMLreturn(Val_true);
}
216 changes: 216 additions & 0 deletions lib_eio/unix/thread_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
(** This thread pool does not spawn threads in advance,
but up to [max_standby_systhreads_per_domain] threads are
kept alive to wait for more work to arrive.
This number was chosen somewhat arbitrarily but benchmarking
shows it to be a good compromise.
Warning: do not increase this number above 32. *)
let max_standby_systhreads_per_domain = 20

(** After completing a job, each thread will wait [max_wait_seconds]
for new work to arrive. If it does not receive work within this
period of time, the worker thread exits. *)
let max_wait_seconds = 0.2

module Bitfield : sig
type t
val create : unit -> t
val get : t -> int -> bool
val set : t -> int -> bool -> unit
val is_empty : t -> bool
val is_full : t -> bool
val find_ready : t -> int
val find_empty : t -> int
end = struct
type t = int ref

let create () = ref 0

let[@inline always] get t i = !t land (1 lsl i) <> 0

let[@inline always] set t i = function
(* cannot do an inline if without allocating, but this pattern match is okay *)
| true -> t := !t lor (1 lsl i)
| false -> t := !t lxor (1 lsl i)

let[@inline always] is_empty t = !t = 0

let full = (1 lsl max_standby_systhreads_per_domain) - 1

let[@inline always] is_full t = !t = full

(** fls: Find Last Set (bit)
Returns the 0-indexed position of the least significant set bit in [v] *)
let[@inline always] fls v =
(* tz: trailing zeroes in [v] become ones, everything else is blanked out *)
let tz = lnot v land (v - 1) in
(* popcount() *)
let x = (tz land 0x55555555) + ((tz lsr 1) land 0x55555555) in
let x = (x land 0x33333333) + ((x lsr 2) land 0x33333333) in
let x = (x land 0x0F0F0F0F) + ((x lsr 4) land 0x0F0F0F0F) in
let x = (x land 0x00FF00FF) + ((x lsr 8) land 0x00FF00FF) in
(x land 0x0000FFFF) + ((x lsr 16) land 0x0000FFFF)

(** Finds least significant 1 *)
let[@inline always] find_ready t = fls !t

(** Finds least significant 0 *)
let[@inline always] find_empty t = (lnot !t) land full |> fls
end

type job =
| New
| Exit
| Job : {
fn: unit -> 'a;
enqueue: ('a, exn) result -> unit;
} -> job

(** This threadpool record type looks the way it does due to the constraints
imposed onto it by the need to run certain bookkeeping operations inside of
[@poll error] functions.

Note: type ['a] will be [Mailbox.t], defined below. *)
type 'a t = {
mutable initialized: bool; (* Have we setup the [Domain.at_exit] callback? *)
threads: 'a array; (* An array of [Mailbox.t] *)
available: Bitfield.t; (* For each thread, is it ready to receive work? *)
mutable terminating: bool; (* Is the domain exiting? *)
}

module Mailbox = struct
(** Mailbox with blocking OS semaphore.
Each [Mailbox.t] controls one worker systhread.
After completing each job, a thread will be kept for a short period
of time in case more work is immediately available.
Otherwise, the thread exits. *)
type t = {
lock: Timed_semaphore.t; (* A Unix [pthread_cond_t] *)
mutable cell: job;
mutable i: int; (* The index of this thread in the threadpool arrays *)
}

(* A worker systhread does not start with an assigned index.
[i] cannot be an option because writing a Some to this field would allocate. *)
let create () = { lock = Timed_semaphore.make false; cell = New; i = -1 }

let dummy = create ()

let put mbox x =
mbox.cell <- x;
Timed_semaphore.release mbox.lock

(* [@poll error] ensures this function is atomic within systhreads of a single domain.
Returns [true] if the thread should exit. *)
let[@poll error] handle_timedout pool = function
| { i = -1; _} ->
(* This thread was never placed in the pool, there is nothing to clean up *)
true
| { i; _ } ->
match Bitfield.get pool.available i with
| true ->
(* Cleanup and exit *)
Bitfield.set pool.available i false;
true
| false ->
(* A thread switch happened right before the start of this function.
[Timed_semaphore.acquire_with_timeout] timed out, but a new job came in
for this thread before we could stop the thread, so execute the new job. *)
false

let rec take pool mbox =
if Timed_semaphore.acquire_with_timeout mbox.lock max_wait_seconds
then mbox.cell
else (
(* Semaphore timed out *)
if handle_timedout pool mbox
then Exit
else take pool mbox
)
end

let create () =
{
initialized = false;
threads = Array.make max_standby_systhreads_per_domain Mailbox.dummy;
available = Bitfield.create ();
terminating = false;
}

(* This function is necessary in order to immediately terminate the systhreads
on stand-by, without having to wait for them to shut down automatically.
Without this termination mechanism, domains that have executed a call on
a systhread in the last [max_wait_seconds] seconds would have to wait for
their timeout to occur, introducing long unwanted pauses. *)
let terminate pool =
pool.terminating <- true;
if not (Bitfield.is_empty pool.available) then
for i = 0 to max_standby_systhreads_per_domain - 1 do
if Bitfield.get pool.available i
then Mailbox.put pool.threads.(i) Exit
done

(* [@poll error] ensures this function is atomic within systhreads of a single domain.
This function (re-)adds the worker systhread to the pool of available threads,
or exits if the pool is already at maximum capacity. *)
let[@poll error] keep_thread_or_exit pool (mbox : Mailbox.t) =
if pool.terminating || Bitfield.is_full pool.available
then raise Thread.Exit
else (
let i = Bitfield.find_empty pool.available in
mbox.i <- i;
Bitfield.set pool.available i true;
pool.threads.(i) <- mbox
)

(* [@poll error] ensures this function is atomic within systhreads of a single domain.
This function tries to reserve one available thread from the pool.
Since we cannot return an option in [@poll error] code, we simulate
an option by conditionally updating the reference [res].
The returned boolean indicates whether we updated [res] or not. *)
let[@poll error] try_get_thread (pool : Mailbox.t t) res =
if Bitfield.is_empty pool.available
then false
else (
let i = Bitfield.find_ready pool.available in
Bitfield.set pool.available i false;
res := pool.threads.(i);
true)

let make_thread pool =
let mbox = Mailbox.create () in
let _t : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take pool mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
enqueue (try Ok (fn ()) with exn -> Error exn);
keep_thread_or_exit pool mbox
done
) ()
in
mbox

(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction
"Only one systhread at a time is allowed to run OCaml code on a particular domain."
So we keep a separate threadpool per domain. *)
let key = Domain.DLS.new_key create

(* [@poll error] ensures this function is atomic within systhreads of a single domain.
https://github.com/ocaml/ocaml/pull/12724
As of OCaml 5.1, [Domain.at_exit] is still not threadsafe *)
let[@poll error] needs_init pool =
if pool.initialized
then false
else (pool.initialized <- true; true)

let run_on_systhread ~enqueue fn =
let pool = Domain.DLS.get key in
if needs_init pool then Domain.at_exit (fun () -> terminate pool);
let mbox =
let res = ref Mailbox.dummy in
if try_get_thread pool res
then !res
else make_thread pool
in
Mailbox.put mbox (Job { fn; enqueue })
1 change: 1 addition & 0 deletions lib_eio/unix/thread_pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit
Loading
Loading