diff --git a/Makefile b/Makefile index 1bfe96dfc..30e649dc2 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ dscheck: stress: dune exec -- ./stress/stress_proc.exe dune exec -- ./stress/stress_semaphore.exe + dune exec -- ./stress/stress_systhreads.exe docker: docker build -t eio . diff --git a/lib_eio/unix/dune b/lib_eio/unix/dune index 446a4fa2a..ef7e0a589 100644 --- a/lib_eio/unix/dune +++ b/lib_eio/unix/dune @@ -4,7 +4,7 @@ (foreign_stubs (language c) (include_dirs include) - (names fork_action stubs)) + (names fork_action stubs thread_pool)) (libraries eio unix threads mtime.clock.os)) (rule diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index 9e8e46955..0e6e559e8 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -51,8 +51,9 @@ val sleep : float -> unit It can also be used in programs that don't care about tracking determinism. *) val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a -(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}). - This allows blocking calls to be made non-blocking. +(** [run_in_systhread fn] runs the function [fn] in using Eio's pool of system threads ({! Thread.t}). + This pool creates a new system thread if all threads are busy, it does not wait. + [run_in_systhread] allows blocking calls to be made non-blocking. @param label The operation name to use in trace output. *) diff --git a/lib_eio/unix/private.ml b/lib_eio/unix/private.ml index f7b6f19b9..8878f1be8 100644 --- a/lib_eio/unix/private.ml +++ b/lib_eio/unix/private.ml @@ -19,5 +19,4 @@ module Fork_action = Fork_action let run_in_systhread ?(label="systhread") fn = Eio.Private.Suspend.enter label @@ fun _ctx enqueue -> - let _t : Thread.t = Thread.create (fun () -> enqueue (try Ok (fn ()) with exn -> Error exn)) () in - () + Thread_pool.run_on_systhread ~enqueue fn diff --git a/lib_eio/unix/thread_pool.c b/lib_eio/unix/thread_pool.c new file mode 100644 index 000000000..524f9e6a7 --- /dev/null +++ b/lib_eio/unix/thread_pool.c @@ -0,0 +1,112 @@ +/* This file is mostly taken from OCaml PR#12867. + * It exposes pthread_cond_timedwait(3) for timed_semaphore.ml. + * This file (and timed_semaphore.ml) can both be deleted once + * this feature is available in OCaml itself. */ + +#include +#include +#include +#include + +#include +#include +#include + +#define CAML_INTERNALS +#include // for CAML_EV_BEGIN and CAML_EV_END +#include // for caml_strerror +#if defined(_WIN32) +#include // for CAML_ULONGLONG_FILETIME +#endif +#undef CAML_INTERNALS + + +#if defined(_WIN32) +/* There are 11644473600 seconds between 1 January 1601 (the NT Epoch) and 1 + * January 1970 (the Unix Epoch). FILETIME is measured in 100ns ticks. + */ +#define CAML_NT_EPOCH_100ns_TICKS 116444736000000000ULL +#else +#include +#endif + +typedef pthread_cond_t* sync_condvar; +#define Condition_val(v) (* (sync_condvar *) Data_custom_val(v)) +typedef pthread_mutex_t * sync_mutex; +#define Mutex_val(v) (* ((sync_mutex *) Data_custom_val(v))) + +static void sync_check_error(int retcode, char * msg) +{ + char * err; + char buf[1024]; + int errlen, msglen; + value str; + + if (retcode == 0) return; + if (retcode == ENOMEM) caml_raise_out_of_memory(); + err = caml_strerror(retcode, buf, sizeof(buf)); + msglen = strlen(msg); + errlen = strlen(err); + str = caml_alloc_string(msglen + 2 + errlen); + memcpy (&Byte(str, 0), msg, msglen); + memcpy (&Byte(str, msglen), ": ", 2); + memcpy (&Byte(str, msglen + 2), err, errlen); + caml_raise_sys_error(str); +} + +#if defined(_WIN32) +static inline void sync_populate_timespec(double timeout_sec, struct timespec * ts) +{ + double integr, frac, until; + CAML_ULONGLONG_FILETIME utime; + + GetSystemTimeAsFileTime(&utime.ft); + until = (utime.ul - CAML_NT_EPOCH_100ns_TICKS) * 1e-7 + timeout_sec; + frac = modf(until, &integr); + ts->tv_sec = (time_t)integr; + ts->tv_nsec = ceil(1e9 * frac); +} +#else +static inline void sync_populate_timespec(double timeout_sec, struct timespec * ts) +{ + double integr, frac; + frac = modf(timeout_sec, &integr); + + struct timeval tv; + gettimeofday(&tv, 0); + ts->tv_sec = tv.tv_sec + (time_t)integr; + ts->tv_nsec = + (uint64_t)tv.tv_usec * (uint64_t)1000 + + (uint64_t)ceil(1e9 * frac); + + if (ts->tv_nsec >= 1e9) { + ts->tv_sec++; + ts->tv_nsec -= 1e9; + } +} +#endif + +value eio_unix_condition_timedwait(value v_cond, value v_mut, + value v_timeout_sec) +{ + CAMLparam3(v_cond, v_mut, v_timeout_sec); + sync_condvar cond = Condition_val(v_cond); + sync_mutex mut = Mutex_val(v_mut); + double timeout_sec = Double_val(v_timeout_sec); + int retcode; + struct timespec ts; + + sync_populate_timespec(timeout_sec, &ts); + + CAML_EV_BEGIN(EV_DOMAIN_CONDITION_WAIT); + caml_enter_blocking_section(); + retcode = pthread_cond_timedwait(cond, mut, &ts); + caml_leave_blocking_section(); + if (retcode == ETIMEDOUT) { + CAMLreturn(Val_false); + } + sync_check_error(retcode, "Condition.timed_wait"); + CAML_EV_END(EV_DOMAIN_CONDITION_WAIT); + + CAMLreturn(Val_true); +} diff --git a/lib_eio/unix/thread_pool.ml b/lib_eio/unix/thread_pool.ml new file mode 100644 index 000000000..6c21e6180 --- /dev/null +++ b/lib_eio/unix/thread_pool.ml @@ -0,0 +1,216 @@ +(** This thread pool does not spawn threads in advance, + but up to [max_standby_systhreads_per_domain] threads are + kept alive to wait for more work to arrive. + This number was chosen somewhat arbitrarily but benchmarking + shows it to be a good compromise. + Warning: do not increase this number above 32. *) +let max_standby_systhreads_per_domain = 20 + +(** After completing a job, each thread will wait [max_wait_seconds] + for new work to arrive. If it does not receive work within this + period of time, the worker thread exits. *) +let max_wait_seconds = 0.2 + +module Bitfield : sig + type t + val create : unit -> t + val get : t -> int -> bool + val set : t -> int -> bool -> unit + val is_empty : t -> bool + val is_full : t -> bool + val find_ready : t -> int + val find_empty : t -> int +end = struct + type t = int ref + + let create () = ref 0 + + let[@inline always] get t i = !t land (1 lsl i) <> 0 + + let[@inline always] set t i = function + (* cannot do an inline if without allocating, but this pattern match is okay *) + | true -> t := !t lor (1 lsl i) + | false -> t := !t lxor (1 lsl i) + + let[@inline always] is_empty t = !t = 0 + + let full = (1 lsl max_standby_systhreads_per_domain) - 1 + + let[@inline always] is_full t = !t = full + + (** fls: Find Last Set (bit) + Returns the 0-indexed position of the least significant set bit in [v] *) + let[@inline always] fls v = + (* tz: trailing zeroes in [v] become ones, everything else is blanked out *) + let tz = lnot v land (v - 1) in + (* popcount() *) + let x = (tz land 0x55555555) + ((tz lsr 1) land 0x55555555) in + let x = (x land 0x33333333) + ((x lsr 2) land 0x33333333) in + let x = (x land 0x0F0F0F0F) + ((x lsr 4) land 0x0F0F0F0F) in + let x = (x land 0x00FF00FF) + ((x lsr 8) land 0x00FF00FF) in + (x land 0x0000FFFF) + ((x lsr 16) land 0x0000FFFF) + + (** Finds least significant 1 *) + let[@inline always] find_ready t = fls !t + + (** Finds least significant 0 *) + let[@inline always] find_empty t = (lnot !t) land full |> fls +end + +type job = + | New + | Exit + | Job : { + fn: unit -> 'a; + enqueue: ('a, exn) result -> unit; + } -> job + +(** This threadpool record type looks the way it does due to the constraints + imposed onto it by the need to run certain bookkeeping operations inside of + [@poll error] functions. + + Note: type ['a] will be [Mailbox.t], defined below. *) +type 'a t = { + mutable initialized: bool; (* Have we setup the [Domain.at_exit] callback? *) + threads: 'a array; (* An array of [Mailbox.t] *) + available: Bitfield.t; (* For each thread, is it ready to receive work? *) + mutable terminating: bool; (* Is the domain exiting? *) +} + +module Mailbox = struct + (** Mailbox with blocking OS semaphore. + Each [Mailbox.t] controls one worker systhread. + After completing each job, a thread will be kept for a short period + of time in case more work is immediately available. + Otherwise, the thread exits. *) + type t = { + lock: Timed_semaphore.t; (* A Unix [pthread_cond_t] *) + mutable cell: job; + mutable i: int; (* The index of this thread in the threadpool arrays *) + } + + (* A worker systhread does not start with an assigned index. + [i] cannot be an option because writing a Some to this field would allocate. *) + let create () = { lock = Timed_semaphore.make false; cell = New; i = -1 } + + let dummy = create () + + let put mbox x = + mbox.cell <- x; + Timed_semaphore.release mbox.lock + + (* [@poll error] ensures this function is atomic within systhreads of a single domain. + Returns [true] if the thread should exit. *) + let[@poll error] handle_timedout pool = function + | { i = -1; _} -> + (* This thread was never placed in the pool, there is nothing to clean up *) + true + | { i; _ } -> + match Bitfield.get pool.available i with + | true -> + (* Cleanup and exit *) + Bitfield.set pool.available i false; + true + | false -> + (* A thread switch happened right before the start of this function. + [Timed_semaphore.acquire_with_timeout] timed out, but a new job came in + for this thread before we could stop the thread, so execute the new job. *) + false + + let rec take pool mbox = + if Timed_semaphore.acquire_with_timeout mbox.lock max_wait_seconds + then mbox.cell + else ( + (* Semaphore timed out *) + if handle_timedout pool mbox + then Exit + else take pool mbox + ) +end + +let create () = + { + initialized = false; + threads = Array.make max_standby_systhreads_per_domain Mailbox.dummy; + available = Bitfield.create (); + terminating = false; + } + +(* This function is necessary in order to immediately terminate the systhreads + on stand-by, without having to wait for them to shut down automatically. + Without this termination mechanism, domains that have executed a call on + a systhread in the last [max_wait_seconds] seconds would have to wait for + their timeout to occur, introducing long unwanted pauses. *) +let terminate pool = + pool.terminating <- true; + if not (Bitfield.is_empty pool.available) then + for i = 0 to max_standby_systhreads_per_domain - 1 do + if Bitfield.get pool.available i + then Mailbox.put pool.threads.(i) Exit + done + +(* [@poll error] ensures this function is atomic within systhreads of a single domain. + This function (re-)adds the worker systhread to the pool of available threads, + or exits if the pool is already at maximum capacity. *) +let[@poll error] keep_thread_or_exit pool (mbox : Mailbox.t) = + if pool.terminating || Bitfield.is_full pool.available + then raise Thread.Exit + else ( + let i = Bitfield.find_empty pool.available in + mbox.i <- i; + Bitfield.set pool.available i true; + pool.threads.(i) <- mbox + ) + +(* [@poll error] ensures this function is atomic within systhreads of a single domain. + This function tries to reserve one available thread from the pool. + Since we cannot return an option in [@poll error] code, we simulate + an option by conditionally updating the reference [res]. + The returned boolean indicates whether we updated [res] or not. *) +let[@poll error] try_get_thread (pool : Mailbox.t t) res = + if Bitfield.is_empty pool.available + then false + else ( + let i = Bitfield.find_ready pool.available in + Bitfield.set pool.available i false; + res := pool.threads.(i); + true) + +let make_thread pool = + let mbox = Mailbox.create () in + let _t : Thread.t = Thread.create (fun () -> + while true do + match Mailbox.take pool mbox with + | New -> assert false + | Exit -> raise Thread.Exit + | Job { fn; enqueue } -> + enqueue (try Ok (fn ()) with exn -> Error exn); + keep_thread_or_exit pool mbox + done + ) () + in + mbox + +(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction + "Only one systhread at a time is allowed to run OCaml code on a particular domain." + So we keep a separate threadpool per domain. *) +let key = Domain.DLS.new_key create + +(* [@poll error] ensures this function is atomic within systhreads of a single domain. + https://github.com/ocaml/ocaml/pull/12724 + As of OCaml 5.1, [Domain.at_exit] is still not threadsafe *) +let[@poll error] needs_init pool = + if pool.initialized + then false + else (pool.initialized <- true; true) + +let run_on_systhread ~enqueue fn = + let pool = Domain.DLS.get key in + if needs_init pool then Domain.at_exit (fun () -> terminate pool); + let mbox = + let res = ref Mailbox.dummy in + if try_get_thread pool res + then !res + else make_thread pool + in + Mailbox.put mbox (Job { fn; enqueue }) diff --git a/lib_eio/unix/thread_pool.mli b/lib_eio/unix/thread_pool.mli new file mode 100644 index 000000000..b735331c2 --- /dev/null +++ b/lib_eio/unix/thread_pool.mli @@ -0,0 +1 @@ +val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit diff --git a/lib_eio/unix/timed_semaphore.ml b/lib_eio/unix/timed_semaphore.ml new file mode 100644 index 000000000..fa9c45e02 --- /dev/null +++ b/lib_eio/unix/timed_semaphore.ml @@ -0,0 +1,53 @@ +(* This file is mostly taken from OCaml PR#12867. + * It exposes pthread_cond_timedwait(3) for timed_semaphore.ml. + * This file (and thread_pool.c) can both be deleted once + * this feature is available in OCaml itself. *) + +external condition_timed_wait : Condition.t -> Mutex.t -> float -> bool = "eio_unix_condition_timedwait" + +type t = { + mut: Mutex.t; (* protects [v] *) + mutable v: int; (* the current value *) + nonzero: Condition.t (* signaled when [v > 0] *) +} + +let make b = + { + mut = Mutex.create(); + v = if b then 1 else 0; + nonzero = Condition.create(); + } + +let release s = + Mutex.lock s.mut; + s.v <- 1; + Condition.signal s.nonzero; + Mutex.unlock s.mut + +let acquire s = + Mutex.lock s.mut; + while s.v = 0 do Condition.wait s.nonzero s.mut done; + s.v <- 0; + Mutex.unlock s.mut + +let acquire_with_timeout s timeout = + Mutex.lock s.mut; + let rec aux () = + if s.v = 0 then begin + let signaled = condition_timed_wait s.nonzero s.mut timeout in + if signaled && s.v = 0 + then aux () + else signaled + end + else true + in + let signaled = aux () in + s.v <- 0; + Mutex.unlock s.mut; + signaled + +let try_acquire s = + Mutex.lock s.mut; + let ret = if s.v = 0 then false else (s.v <- 0; true) in + Mutex.unlock s.mut; + ret diff --git a/lib_eio_posix/low_level.ml b/lib_eio_posix/low_level.ml index 9e99619bd..2efc61d7a 100644 --- a/lib_eio_posix/low_level.ml +++ b/lib_eio_posix/low_level.ml @@ -15,7 +15,6 @@ module Fd = Eio_unix.Fd module Trace = Eio.Private.Trace module Fiber_context = Eio.Private.Fiber_context -(* todo: keeping a pool of workers is probably faster *) let in_worker_thread label = Eio_unix.run_in_systhread ~label let await_readable op fd = diff --git a/lib_eio_windows/low_level.ml b/lib_eio_windows/low_level.ml index df68f9aad..f9290a5a9 100755 --- a/lib_eio_windows/low_level.ml +++ b/lib_eio_windows/low_level.ml @@ -11,7 +11,6 @@ open Eio.Std type ty = Read | Write -(* todo: keeping a pool of workers is probably faster *) let in_worker_thread = Eio_unix.run_in_systhread module Fd = Eio_unix.Fd diff --git a/stress/stress_systhreads.ml b/stress/stress_systhreads.ml new file mode 100644 index 000000000..c3525a104 --- /dev/null +++ b/stress/stress_systhreads.ml @@ -0,0 +1,30 @@ +open Eio.Std + +let n_rounds = 10 + +let main env = + let fs = Eio.Stdenv.fs env in + let path = Eio.Path.(fs / "/tmp/eio-test") in + Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 path; + Switch.run @@ fun sw -> + Switch.on_release sw (fun () -> Eio.Path.rmtree ~missing_ok:true path); + let num = 100 in + traceln "-------------------------------------"; + for i = 1 to n_rounds do + Switch.run @@ fun sw -> + let pool = Eio.Executor_pool.create ~sw ~domain_count:5 (Eio.Stdenv.domain_mgr env) in + List.init num Fun.id + |> Eio.Fiber.List.iter (fun i -> + Eio.Executor_pool.submit_exn pool ~weight:0.0 (fun () -> + let path = Eio.Path.(path / Format.sprintf "test%d.txt" i) in + Eio.Path.with_open_out path ~create:(`Or_truncate 0o600) @@ fun file -> + Eio.Flow.copy_string "!!!" file + ) + ); + traceln "Finished round %d/%d" i n_rounds + done; + traceln "Success" + +let () = + Eio_main.run @@ fun env -> + main env