diff --git a/bench/bench_fib.ocaml5.ml b/bench/bench_fib.ocaml5.ml index 13e933a68..cbe063882 100644 --- a/bench/bench_fib.ocaml5.ml +++ b/bench/bench_fib.ocaml5.ml @@ -9,7 +9,7 @@ let rec exp_fib i = else let computation = Computation.create () in let main () = Computation.return computation (exp_fib (i - 2)) in - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; let f1 = exp_fib (i - 1) in let f2 = Computation.await computation in f1 + f2 diff --git a/bench/bench_spawn.ml b/bench/bench_spawn.ml index a71a5a485..a17a24081 100644 --- a/bench/bench_spawn.ml +++ b/bench/bench_spawn.ml @@ -5,7 +5,7 @@ let factor = Util.iter_factor * if String.starts_with ~prefix:"4." Sys.ocaml_version then 1 else 10 -let run_one ~budgetf ~at_a_time () = +let run_one ~budgetf () = let n_spawns = 10 * factor in let init _ = () in @@ -13,25 +13,23 @@ let run_one ~budgetf ~at_a_time () = let work _ () = let counter = ref n_spawns in let computation = Computation.create () in - for _ = 1 to n_spawns / at_a_time do - let main () = - let n = !counter - 1 in - counter := n; - if n = 0 then Computation.finish computation - in - let mains = List.init at_a_time @@ fun _ -> main in - Fiber.spawn ~forbid:false computation mains + let computation_packed = Computation.Packed computation in + let main () = + let n = !counter - 1 in + counter := n; + if n = 0 then Computation.finish computation; + Fiber.finalize (Fiber.current ()) + in + for _ = 1 to n_spawns do + let fiber = Fiber.create_packed ~forbid:false computation_packed in + Fiber.spawn fiber main done; Computation.await computation in - let config = Printf.sprintf "%d at a time" at_a_time in + let config = "with packed computation" in Times.record ~budgetf ~n_domains:1 ~n_warmups:1 ~n_runs_min:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_spawns ~singular:"spawn" ~config -let run_suite ~budgetf = - if Sys.int_size <= 32 then [] - else - [ 1; 2; 4; 8 ] - |> List.concat_map @@ fun at_a_time -> run_one ~budgetf ~at_a_time () +let run_suite ~budgetf = if Sys.int_size <= 32 then [] else run_one ~budgetf () diff --git a/lib/picos/bootstrap/picos_bootstrap.ml b/lib/picos/bootstrap/picos_bootstrap.ml index 510aec76b..16cc316bd 100644 --- a/lib/picos/bootstrap/picos_bootstrap.ml +++ b/lib/picos/bootstrap/picos_bootstrap.ml @@ -331,13 +331,60 @@ module Fiber = struct type 'a initial = Constant of 'a | Computed of (unit -> 'a) - let new_key initial = + type finalizers = + | Nil + | Finalizer : { + index : int; + finalize : 'a -> unit; + next : finalizers; + } + -> finalizers + + let finalizers = Atomic.make Nil + + let rec add_finalizer index finalize = + let before = Atomic.get finalizers in + let after = Finalizer { index; finalize; next = before } in + if not (Atomic.compare_and_set finalizers before after) then + add_finalizer index finalize + + type initializers = + | Nil + | Initializer : { + key : 'a key; + initialize : t -> 'a; + next : initializers; + } + -> initializers + + let initializers = Atomic.make Nil + + let rec add_initializer key initialize = + let before = Atomic.get initializers in + let after = Initializer { key; initialize; next = before } in + if not (Atomic.compare_and_set initializers before after) then + add_initializer key initialize + + let new_key ?finalize ?initialize initial = let index = Atomic.fetch_and_add counter 1 in - match initial with - | Constant default -> - let default = Sys.opaque_identity (Obj.magic default : non_float) in - { index; default; compute } - | Computed compute -> { index; default = unique; compute } + begin + match finalize with + | None -> () + | Some finalize -> add_finalizer index finalize + end; + let key = + match initial with + | Constant default -> + let default = Sys.opaque_identity (Obj.magic default : non_float) in + { index; default; compute } + | Computed compute -> { index; default = unique; compute } + in + begin + match initialize with + | None -> () + | Some initialize -> add_initializer key initialize + end; + key let get (type a) (Fiber r : t) (key : a key) = let fls = r.fls in @@ -369,6 +416,13 @@ module Fiber = struct (Sys.opaque_identity (Obj.magic value : non_float)); value + let[@inline] has (Fiber r : t) key = + let fls = r.fls in + key.index < Array.length fls + && + let value = Array.unsafe_get fls key.index in + value != unique + let set (type a) (Fiber r : t) (key : a key) (value : a) = let fls = r.fls in if key.index < Array.length fls then @@ -379,14 +433,40 @@ module Fiber = struct r.fls <- fls; Array.unsafe_set fls key.index (Sys.opaque_identity (Obj.magic value : non_float)) + + let rec finalize fls = function + | Finalizer r -> + if r.index < Array.length fls then begin + let value = Array.unsafe_get fls r.index in + if value != unique then r.finalize (Obj.magic value) + end; + finalize fls r.next + | Nil -> () + + let rec initialize ~parent ~child = function + | Initializer r -> + if not (has child r.key) then set child r.key (r.initialize parent); + initialize ~parent ~child r.next + | Nil -> () end + + let[@inline] finalize (Fiber t : t) = + let fls = t.fls in + if 0 < Array.length fls then FLS.finalize fls (Atomic.get FLS.finalizers) + + let[@inline] initialize ~parent ~child = + match Atomic.get FLS.initializers with + | Initializer r -> + if not (FLS.has child r.key) then + FLS.set child r.key (r.initialize parent); + FLS.initialize ~parent ~child r.next + | Nil -> () end module Handler = struct type 'c t = { current : 'c -> Fiber.t; - spawn : - 'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit; + spawn : 'c -> Fiber.t -> (unit -> unit) -> unit; yield : 'c -> unit; cancel_after : 'a. 'c -> 'a Computation.t -> seconds:float -> Exn_bt.t -> unit; diff --git a/lib/picos/intf.ocaml5.ml b/lib/picos/intf.ocaml5.ml index 83b5cbf35..24cb487db 100644 --- a/lib/picos/intf.ocaml5.ml +++ b/lib/picos/intf.ocaml5.ml @@ -132,13 +132,13 @@ module type Fiber = sig associated with the fiber has been canceled the scheduler is free to discontinue the fiber immediately before spawning new fibers. - The scheduler is free to run the newly created fibers on any domain and + The scheduler is free to run the newly created fiber on any domain and decide which fiber to give priority to. - ⚠️ The scheduler should guarantee that, when [Spawn] returns normally, all - of the [mains] will eventually be called by the scheduler and, when - [Spawn] raises an exception, none of the [mains] will be called. In other - words, [Spawn] should check cancelation just once and be all or nothing. + ⚠️ The scheduler should guarantee that, when [Spawn] returns normally, the + given [main] will eventually be called by the scheduler and, when [Spawn] + raises an exception, the [main] will not be called. In other words, + [Spawn] should check cancelation just once and be all or nothing. Furthermore, in case a newly spawned fiber is canceled before its main is called, the scheduler must still call the main. This allows a program to ensure, i.e. keep track of, that all fibers it spawns are terminated @@ -146,10 +146,5 @@ module type Fiber = sig properly. *) type _ Effect.t += private - | Spawn : { - forbid : bool; - computation : 'a computation; - mains : (unit -> unit) list; - } - -> unit Effect.t + | Spawn : { fiber : t; main : unit -> unit } -> unit Effect.t end diff --git a/lib/picos/ocaml4/picos_ocaml.ml b/lib/picos/ocaml4/picos_ocaml.ml index ad1743729..d92ffc11d 100644 --- a/lib/picos/ocaml4/picos_ocaml.ml +++ b/lib/picos/ocaml4/picos_ocaml.ml @@ -1,17 +1,17 @@ open Picos_bootstrap -let error () = +let[@inline never] error _ = raise (Sys_error "Picos.Handler.using not called for current thread") module Handler = struct type entry = E : { context : 'a; handler : 'a Handler.t } -> entry let default = - let current _ = error () - and spawn _ ~forbid:_ _ _ = error () - and yield _ = error () - and cancel_after _ _ ~seconds:_ _ = error () - and await _ _ = error () in + let current = error + and spawn _ _ = error + and yield = error + and cancel_after _ _ ~seconds:_ = error + and await _ = error in E { context = (); handler = { current; spawn; yield; cancel_after; await } } let key = Picos_thread.TLS.new_key @@ fun () -> default @@ -42,9 +42,9 @@ module Fiber = struct let (E r) = Handler.get () in r.handler.current r.context - let spawn ~forbid computation mains = + let spawn fiber main = let (E r) = Handler.get () in - r.handler.spawn r.context ~forbid computation mains + r.handler.spawn r.context fiber main let yield () = let (E r) = Handler.get () in diff --git a/lib/picos/ocaml5/picos_ocaml.ml b/lib/picos/ocaml5/picos_ocaml.ml index 8e845137c..0f2227bc4 100644 --- a/lib/picos/ocaml5/picos_ocaml.ml +++ b/lib/picos/ocaml5/picos_ocaml.ml @@ -25,15 +25,9 @@ module Fiber = struct let current () = Effect.perform Current type _ Effect.t += - | Spawn : { - forbid : bool; - computation : 'a Computation.t; - mains : (unit -> unit) list; - } - -> unit Effect.t + | Spawn : { fiber : Fiber.t; main : unit -> unit } -> unit Effect.t - let spawn ~forbid computation mains = - Effect.perform @@ Spawn { forbid; computation; mains } + let spawn fiber main = Effect.perform @@ Spawn { fiber; main } type _ Effect.t += Yield : unit Effect.t @@ -78,7 +72,7 @@ module Handler = struct | Fiber.Spawn r -> Some (fun k -> - match h.spawn c ~forbid:r.forbid r.computation r.mains with + match h.spawn c r.fiber r.main with | unit -> Effect.Deep.continue k unit | exception exn -> discontinue k exn) | Fiber.Yield -> yield diff --git a/lib/picos/picos.mli b/lib/picos/picos.mli index f7ed275c3..08ce25c71 100644 --- a/lib/picos/picos.mli +++ b/lib/picos/picos.mli @@ -757,30 +757,6 @@ module Fiber : sig (** [sleep ~seconds] suspends the current fiber for the specified number of [seconds]. *) - (** {2 Interface for spawning} *) - - val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit - (** [spawn ~forbid computation mains] starts new fibers by performing the - {!Spawn} effect. The fibers will share the same [computation] and start - with {{!Fiber.has_forbidden} propagation of cancelation forbidden or - permitted} depending on the [forbid] flag. - - ℹ️ Any {{!Computation} computation}, including the computation of the - current fiber, may be passed as the computation for new fibers. Higher - level libraries are free to implement the desired structuring principles. - - ⚠️ Behavior is undefined if any function in [mains] raises an exception. - For example, raising an exception might terminate the whole application - (recommended, but not required) or the exception might be ignored. In - other words, the caller {i must} arrange for the computation to be - completed and errors reported in a desired manner. - - ℹ️ The behavior is that - - - on OCaml 5, [spawn] performs the {!Spawn} effect, and - - on OCaml 4, [spawn] will call the [spawn] operation of the {{!Handler} - current handler}. *) - (** {2 Interface for current fiber} *) type t @@ -943,7 +919,8 @@ module Fiber : sig (** Type to specify initial values for fibers. *) type 'a initial = Constant of 'a | Computed of (unit -> 'a) - val new_key : 'a initial -> 'a key + val new_key : + ?finalize:('a -> unit) -> ?initialize:(t -> 'a) -> 'a initial -> 'a key (** [new_key initial] allocates a new key for associating values in storage associated with fibers. The [initial] value for every fiber is either the given {!Constant} or is {!Computed} with the given function. If the @@ -965,6 +942,38 @@ module Fiber : sig ⚠️ It is only safe to call [set] from the fiber itself. *) end + (** {2 Interface for spawning} *) + + val create_packed : forbid:bool -> Computation.packed -> t + (** [create_packed ~forbid packed] creates a new fiber record. *) + + val create : forbid:bool -> 'a Computation.t -> t + (** [create ~forbid computation] is equivalent to + {{!create_packed} [create_packed ~forbid (Computation.Packed computation)]}. *) + + val finalize : t -> unit + (** *) + + val spawn : t -> (unit -> unit) -> unit + (** [spawn fiber main] starts a new fiber by performing the {!Spawn} effect. + + ⚠️ Fiber records must be unique and the caller of [spawn] must make sure + that a specific {{!fiber} fiber} record is not reused. Failure to ensure + that fiber records are unique will break concurrent abstractions written + on top the the Picos interface. + + ⚠️ Behavior is undefined if the [main] function raises an exception. For + example, raising an exception might terminate the whole application + (recommended, but not required) or the exception might be ignored. In + other words, the caller {i must} arrange for any exceptions to be handled + in a desired manner. + + ℹ️ The behavior is that + + - on OCaml 5, [spawn] performs the {!Spawn} effect, and + - on OCaml 4, [spawn] will call the [spawn] operation of the {{!Handler} + current handler}. *) + (** {2 Interface for structuring} *) val get_computation : t -> Computation.packed @@ -1051,12 +1060,8 @@ module Fiber : sig (** {2 Interface for schedulers} *) - val create_packed : forbid:bool -> Computation.packed -> t - (** [create_packed ~forbid packed] creates a new fiber. *) - - val create : forbid:bool -> 'a Computation.t -> t - (** [create ~forbid computation] is equivalent to - {{!create_packed} [create_packed ~forbid (Computation.Packed computation)]}. *) + val initialize : parent:t -> child:t -> unit + (** *) val try_suspend : t -> Trigger.t -> 'x -> 'y -> (Trigger.t -> 'x -> 'y -> unit) -> bool @@ -1115,8 +1120,7 @@ module Handler : sig type 'c t = { current : 'c -> Fiber.t; (** See {!Picos.Fiber.current}. *) - spawn : - 'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit; + spawn : 'c -> Fiber.t -> (unit -> unit) -> unit; (** See {!Picos.Fiber.spawn}. *) yield : 'c -> unit; (** See {!Picos.Fiber.yield}. *) cancel_after : diff --git a/lib/picos_fifos/picos_fifos.ml b/lib/picos_fifos/picos_fifos.ml index 76c6efd5a..62c758d20 100644 --- a/lib/picos_fifos/picos_fifos.ml +++ b/lib/picos_fifos/picos_fifos.ml @@ -21,13 +21,6 @@ type t = { retc : unit -> unit; } -let rec spawn t n forbid packed = function - | [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore - | main :: mains -> - let fiber = Fiber.create_packed ~forbid packed in - Picos_mpscq.push t.ready (Spawn (fiber, main)); - spawn t (n + 1) forbid packed mains - let continue = Some (fun k -> Effect.Deep.continue k ()) let rec next t = @@ -55,7 +48,9 @@ let rec next t = whole operation or discontinue the fiber. *) if Fiber.is_canceled fiber then discontinue else begin - spawn t 0 r.forbid (Packed r.computation) r.mains; + Fiber.initialize ~parent:fiber ~child:r.fiber; + Atomic.incr t.num_alive_fibers; + Picos_mpscq.push t.ready (Spawn (r.fiber, r.main)); continue end | Fiber.Yield -> yield @@ -145,7 +140,10 @@ let run ?(forbid = false) main = in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in - let main = Computation.capture computation main in + let main () = + Computation.capture computation main (); + Fiber.finalize fiber + in Picos_mpscq.push t.ready (Spawn (fiber, main)); next t; Computation.await computation diff --git a/lib/picos_lwt/picos_lwt.ml b/lib/picos_lwt/picos_lwt.ml index 8c00c7df9..4184883ec 100644 --- a/lib/picos_lwt/picos_lwt.ml +++ b/lib/picos_lwt/picos_lwt.ml @@ -49,13 +49,9 @@ let[@alert "-handler"] rec go : (fun k -> match Fiber.canceled fiber with | None -> - let packed = Computation.Packed r.computation in - List.iter - (fun main -> - let fiber = Fiber.create_packed ~forbid:r.forbid packed in - Lwt.async @@ fun () -> - go fiber system (Effect.Shallow.fiber main) (Ok ())) - r.mains; + Fiber.initialize ~parent:fiber ~child:r.fiber; + Lwt.async (fun () -> + go r.fiber system (Effect.Shallow.fiber r.main) (Ok ())); go fiber system k (Ok ()) | Some exn_bt -> go fiber system k (Error exn_bt)) | Fiber.Yield -> @@ -109,4 +105,9 @@ let run ?(forbid = false) system main = if not (Picos_thread.is_main_thread ()) then not_main_thread (); let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in + let main () = + Computation.capture computation main (); + Fiber.finalize fiber; + Computation.await computation + in go fiber system (Effect.Shallow.fiber main) (Ok ()) diff --git a/lib/picos_randos/picos_randos.ml b/lib/picos_randos/picos_randos.ml index 25fe3f009..5b12c033b 100644 --- a/lib/picos_randos/picos_randos.ml +++ b/lib/picos_randos/picos_randos.ml @@ -50,15 +50,6 @@ type t = { mutable run : bool; } -let rec spawn t forbid packed = function - | [] -> () - | main :: mains -> - let fiber = Fiber.create_packed ~forbid packed in - Atomic.incr t.num_alive_fibers; - Collection.push t.ready (Spawn (fiber, main)); - if !(t.num_waiters_non_zero) then Condition.signal t.condition; - spawn t forbid packed mains - let rec next t = match Collection.pop_exn t.ready with | Spawn (fiber, main) -> @@ -84,7 +75,10 @@ let rec next t = | Fiber.Spawn r -> if Fiber.is_canceled fiber then yield else begin - spawn t r.forbid (Packed r.computation) r.mains; + Fiber.initialize ~parent:fiber ~child:r.fiber; + Atomic.incr t.num_alive_fibers; + Collection.push t.ready (Spawn (r.fiber, r.main)); + if !(t.num_waiters_non_zero) then Condition.signal t.condition; return end | Fiber.Yield -> yield @@ -219,7 +213,10 @@ let run ?context:t_opt ?(forbid = false) main = Mutex.unlock t.mutex; let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in - let main = Computation.capture computation main in + let main () = + Computation.capture computation main (); + Fiber.finalize fiber + in Collection.push t.ready (Spawn (fiber, main)); next t; Mutex.lock t.mutex; diff --git a/lib/picos_structured/bundle.ml b/lib/picos_structured/bundle.ml index 4fb21870e..f8822fd81 100644 --- a/lib/picos_structured/bundle.ml +++ b/lib/picos_structured/bundle.ml @@ -2,14 +2,15 @@ open Picos type t = { num_fibers : int Atomic.t; - bundle : unit Computation.t; + bundle : Computation.packed; errors : Control.Errors.t; finished : Trigger.t; } let terminate ?callstack t = let terminate_bt = Control.terminate_bt ?callstack () in - Computation.cancel t.bundle terminate_bt + let (Packed bundle) = t.bundle in + Computation.cancel bundle terminate_bt let error ?callstack t (exn_bt : Exn_bt.t) = if exn_bt.Exn_bt.exn != Control.Terminate then begin @@ -20,7 +21,8 @@ let error ?callstack t (exn_bt : Exn_bt.t) = let decr t = let n = Atomic.fetch_and_add t.num_fibers (-1) in if n = 1 then begin - Computation.finish t.bundle; + let (Packed bundle) = t.bundle in + Computation.cancel bundle (Control.terminate_bt ()); Trigger.signal t.finished end @@ -38,18 +40,18 @@ let await t fiber packed canceler = let join_after fn = let t = let num_fibers = Atomic.make 1 in - let bundle = Computation.create ~mode:`LIFO () in + let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in let errors = Control.Errors.create () in let finished = Trigger.create () in { num_fibers; bundle; errors; finished } in let fiber = Fiber.current () in let (Packed parent as packed) = Fiber.get_computation fiber in - let bundle = Computation.Packed t.bundle in - let canceler = Computation.attach_canceler ~from:parent ~into:t.bundle in + let (Packed bundle) = t.bundle in + let canceler = Computation.attach_canceler ~from:parent ~into:bundle in (* TODO: Ideally there should be no poll point betweem [attach_canceler] and the [match ... with] below. *) - Fiber.set_computation fiber bundle; + Fiber.set_computation fiber t.bundle; match fn t with | value -> await t fiber packed canceler; @@ -71,8 +73,10 @@ let rec incr t backoff = let fork_as_promise t thunk = incr t Backoff.default; let child = Computation.create ~mode:`LIFO () in + let fiber = Fiber.create ~forbid:false child in try - let canceler = Computation.attach_canceler ~from:t.bundle ~into:child in + let (Packed bundle) = t.bundle in + let canceler = Computation.attach_canceler ~from:bundle ~into:child in let main () = begin match thunk () with @@ -82,10 +86,12 @@ let fork_as_promise t thunk = Computation.cancel child exn_bt; error t exn_bt end; - Computation.detach t.bundle canceler; + Fiber.finalize fiber; + let (Packed bundle) = t.bundle in + Computation.detach bundle canceler; decr t in - Fiber.spawn ~forbid:false child [ main ]; + Fiber.spawn fiber main; child with canceled_exn -> (* We don't need to worry about deatching the [canceler], because at this @@ -97,6 +103,9 @@ let fork t thunk = fork_as_promise t thunk |> ignore (* *) -let is_running t = Computation.is_running t.bundle +let is_running t = + let (Packed bundle) = t.bundle in + Computation.is_running bundle + let unsafe_incr t = Atomic.incr t.num_fibers let unsafe_reset t = Atomic.set t.num_fibers 1 diff --git a/lib/picos_structured/run.ml b/lib/picos_structured/run.ml index af42ac3d7..c038239c8 100644 --- a/lib/picos_structured/run.ml +++ b/lib/picos_structured/run.ml @@ -1,29 +1,40 @@ open Picos -let wrap_all t main = - Bundle.unsafe_incr t; - fun () -> - if Bundle.is_running t then begin - try main () with exn -> Bundle.error t (Exn_bt.get exn) - end; - Bundle.decr t +let wrap_all t fiber main () = + if Bundle.is_running t then begin + try main () with exn -> Bundle.error t (Exn_bt.get exn) + end; + Fiber.finalize fiber; + Bundle.decr t -let wrap_any t main = - Bundle.unsafe_incr t; - fun () -> - if Bundle.is_running t then begin - try - main (); - Bundle.terminate t - with exn -> Bundle.error t (Exn_bt.get exn) - end; - Bundle.decr t +let wrap_any t fiber main () = + if Bundle.is_running t then begin + match main () with + | () -> Bundle.terminate t + | exception exn -> Bundle.error t (Exn_bt.get exn) + end; + Fiber.finalize fiber; + Bundle.decr t + +let rec spawn t wrap = function + | [] -> () + | main :: mains -> + if Bundle.is_running t then begin + Bundle.unsafe_incr t; + begin + try + let fiber = Fiber.create_packed ~forbid:false t.bundle in + Fiber.spawn fiber (wrap t fiber main) + with exn -> + Bundle.decr t; + raise exn + end; + spawn t wrap mains + end let run actions wrap = Bundle.join_after @@ fun t -> - try - let mains = List.map (wrap t) actions in - Fiber.spawn ~forbid:false t.bundle mains + try spawn t wrap actions with exn -> Bundle.unsafe_reset t; raise exn diff --git a/lib/picos_threaded/picos_threaded.ml b/lib/picos_threaded/picos_threaded.ml index 191e84cdf..80e982704 100644 --- a/lib/picos_threaded/picos_threaded.ml +++ b/lib/picos_threaded/picos_threaded.ml @@ -67,74 +67,24 @@ and cancel_after : type a. _ -> a Computation.t -> _ = Fiber.check t.fiber; Select.cancel_after computation ~seconds exn_bt -and spawn : type a. _ -> forbid:bool -> a Computation.t -> _ = - fun t ~forbid computation mains -> +and spawn t fiber main = Fiber.check t.fiber; - let packed = Computation.Packed computation in - match mains with - | [ main ] -> - Thread.create - (fun () -> - (* We need to (recursively) install the handler on each new thread - that we create. *) - Handler.using handler (create_packed ~forbid packed) main) - () - |> ignore - | mains -> begin - (* We try to be careful to implement the all-or-nothing behaviour based on - the assumption that we may run out of threads well before we run out of - memory. In a thread pool based scheduler this should actually not - require special treatment. *) - let all_or_nothing = ref `Wait in - match - mains - |> List.iter @@ fun main -> - Thread.create - (fun () -> - if !all_or_nothing == `Wait then begin - Mutex.lock t.mutex; - match - while - match !all_or_nothing with - | `Wait -> - Condition.wait t.condition t.mutex; - true - | `All | `Nothing -> false - do - () - done - with - | () -> Mutex.unlock t.mutex - | exception async_exn -> - (* Condition.wait may be interrupted by asynchronous - exceptions and we must make sure to unlock even in that - case. *) - Mutex.unlock t.mutex; - raise async_exn - end; - if !all_or_nothing == `All then - (* We need to (recursively) install the handler on each new - thread that we create. *) - Handler.using handler (create_packed ~forbid packed) main) - () - |> ignore - with - | () -> - Mutex.lock t.mutex; - all_or_nothing := `All; - Mutex.unlock t.mutex; - Condition.broadcast t.condition - | exception exn -> - Mutex.lock t.mutex; - all_or_nothing := `Nothing; - Mutex.unlock t.mutex; - Condition.broadcast t.condition; - raise exn - end + let main () = + (* We need to install the handler on each new thread that we create. *) + let mutex = Mutex.create () and condition = Condition.create () in + Handler.using handler { fiber; mutex; condition } main + in + Thread.create main () |> ignore and handler = Handler.{ current; spawn; yield; cancel_after; await } let run ?(forbid = false) main = Select.check_configured (); - let packed = Computation.Packed (Computation.create ~mode:`LIFO ()) in - Handler.using handler (create_packed ~forbid packed) main + let computation = Computation.create ~mode:`LIFO () in + let context = create_packed ~forbid (Packed computation) in + let main () = + Computation.capture computation main (); + Fiber.finalize context.fiber; + Computation.await computation + in + Handler.using handler context main diff --git a/test/lib/meow/dllist.ml b/test/lib/meow/dllist.ml new file mode 100644 index 000000000..e155f1ed1 --- /dev/null +++ b/test/lib/meow/dllist.ml @@ -0,0 +1,43 @@ +type 'a node = { mutable lhs : 'a node; mutable rhs : 'a node; value : 'a } +type 'a t = 'a node + +let new_node value = + let node = { lhs = Obj.magic (); rhs = Obj.magic (); value } in + node.lhs <- node; + node.rhs <- node; + node + +let create () = new_node (Obj.magic ()) +let is_empty t = t.lhs == t +let value node = node.value + +let remove node = + let lhs = node.lhs in + if lhs != node then begin + let rhs = node.rhs in + lhs.rhs <- rhs; + rhs.lhs <- lhs; + node.lhs <- node; + node.rhs <- node + end + +let move_l t node = + let lhs = node.lhs in + if lhs != node then begin + let rhs = node.rhs in + lhs.rhs <- rhs; + rhs.lhs <- lhs + end; + let lhs = t.lhs in + lhs.rhs <- node; + node.lhs <- lhs; + t.lhs <- node; + node.rhs <- t + +let rec iter_l action t node = + if node != t then begin + action node.value; + iter_l action t node.lhs + end + +let iter_l action t = iter_l action t t.lhs diff --git a/test/lib/meow/dllist.mli b/test/lib/meow/dllist.mli new file mode 100644 index 000000000..d095dd3f3 --- /dev/null +++ b/test/lib/meow/dllist.mli @@ -0,0 +1,10 @@ +type !'a t +type !'a node + +val create : unit -> 'a t +val is_empty : 'a t -> bool +val new_node : 'a -> 'a node +val value : 'a node -> 'a +val remove : 'a node -> unit +val move_l : 'a t -> 'a node -> unit +val iter_l : ('a -> unit) -> 'a t -> unit diff --git a/test/lib/meow/dune b/test/lib/meow/dune new file mode 100644 index 000000000..46425fe5a --- /dev/null +++ b/test/lib/meow/dune @@ -0,0 +1,3 @@ +(library + (name meow) + (libraries picos)) diff --git a/test/lib/meow/meow.ml b/test/lib/meow/meow.ml new file mode 100644 index 000000000..71edfc281 --- /dev/null +++ b/test/lib/meow/meow.ml @@ -0,0 +1,2 @@ +module Ownership = Ownership +module Promise = Promise diff --git a/test/lib/meow/meow.mli b/test/lib/meow/meow.mli new file mode 100644 index 000000000..39879cb4d --- /dev/null +++ b/test/lib/meow/meow.mli @@ -0,0 +1,16 @@ +module Ownership : sig + type t + + val create : finally:('a -> unit) -> 'a -> t + val own : t -> unit + val check : t -> unit + val disown : t -> unit + val bless : t -> unit +end + +module Promise : sig + type !'a t + + val async : ?give:Ownership.t list -> (unit -> 'a) -> 'a t + val await : 'a t -> 'a +end diff --git a/test/lib/meow/ownership.ml b/test/lib/meow/ownership.ml new file mode 100644 index 000000000..6d3a93906 --- /dev/null +++ b/test/lib/meow/ownership.ml @@ -0,0 +1,114 @@ +open Picos + +exception Resource_leaked +exception Not_owner +exception I_am_root +exception Parent_is_dead + +type resource = + | Resource : { + finally : 'a -> unit; + value : 'a; + mutable owner : Fiber.Maybe.t; + } + -> resource + +type t = resource Dllist.node + +let finalize (Resource r) = r.finally r.value + +let owned_key = + let finalize resources = + if not (Dllist.is_empty resources) then + let fiber = Fiber.current () in + if Fiber.is_canceled fiber then Dllist.iter_l finalize resources + else raise Resource_leaked + in + Fiber.FLS.new_key ~finalize (Computed Dllist.create) + +let own_as resource fiber = + let (Resource r) = Dllist.value resource in + r.owner <- Fiber.Maybe.of_fiber fiber; + let owned = Fiber.FLS.get fiber owned_key in + Dllist.move_l owned resource + +let own resource = own_as resource (Fiber.current ()) + +let create ~finally value = + Dllist.new_node (Resource { finally; value; owner = Fiber.Maybe.nothing }) + +type _ tdt = + | Finalized : [> `Finalized ] tdt + | Nil : [> `Nil ] tdt + | Blessed : { + resource : t; + next : [ `Nil | `Blessed ] tdt; + } + -> [> `Blessed ] tdt + +let rec iter action = function + | Nil -> () + | Blessed r -> + action r.resource; + iter action r.next + +let blessed_key : [ `Finalized | `Nil | `Blessed ] tdt Atomic.t Fiber.FLS.key = + let finalize t = + match Atomic.exchange t Finalized with + | Finalized -> () + | (Nil as resources) | (Blessed _ as resources) -> + iter (fun node -> finalize (Dllist.value node)) resources + in + Fiber.FLS.new_key ~finalize (Computed (fun () -> Atomic.make Nil)) + +let[@inline never] accept_as fiber blessed = + match Atomic.exchange blessed Nil with + | Finalized -> failwith "accept after finalize" + | (Nil as resources) | (Blessed _ as resources) -> + let owned = Fiber.FLS.get fiber owned_key in + resources + |> iter @@ fun node -> + let (Resource r) = Dllist.value node in + r.owner <- Fiber.Maybe.of_fiber fiber; + Dllist.move_l owned node + +let[@inline] accept_as fiber = + let blessed = Fiber.FLS.get fiber blessed_key in + if Atomic.get blessed != Nil then accept_as fiber blessed + +let check_as resource fiber = + accept_as fiber; + let (Resource r) = Dllist.value resource in + if Fiber.Maybe.unequal r.owner (Fiber.Maybe.of_fiber fiber) then + raise Not_owner + +let[@inline] check resource = check_as resource (Fiber.current ()) + +let disown_as resource fiber = + check_as resource fiber; + Dllist.remove resource; + let (Resource r) = Dllist.value resource in + r.owner <- Fiber.Maybe.nothing + +let disown resource = disown_as resource (Fiber.current ()) + +let parent_key = + let initialize parent = Fiber.FLS.get parent blessed_key + and root () = raise I_am_root in + Fiber.FLS.new_key ~initialize (Computed root) + +let bless resource = + let fiber = Fiber.current () in + disown_as resource fiber; + let parent = Fiber.FLS.get fiber parent_key in + let rec loop parent resource = + match Atomic.get parent with + | Finalized -> + own_as resource fiber; + raise Parent_is_dead + | (Nil as before) | (Blessed _ as before) -> + let after = Blessed { resource; next = before } in + if not (Atomic.compare_and_set parent before after) then + loop parent resource + in + loop parent resource diff --git a/test/lib/meow/promise.ml b/test/lib/meow/promise.ml new file mode 100644 index 000000000..ff3a99e8f --- /dev/null +++ b/test/lib/meow/promise.ml @@ -0,0 +1,16 @@ +open Picos + +type 'a t = 'a Computation.t + +let async ?give main = + let computation = Computation.create () in + let child = Fiber.create ~forbid:false computation in + give |> Option.iter @@ List.iter (fun node -> Ownership.own_as node child); + let main () = + Computation.capture computation main (); + Fiber.finalize child + in + Fiber.spawn child main; + computation + +let await = Computation.await diff --git a/test/test_picos.ml b/test/test_picos.ml index efa904656..7a46530ae 100644 --- a/test/test_picos.ml +++ b/test/test_picos.ml @@ -3,7 +3,9 @@ open Picos_structured.Finally let run_in_fiber main = let computation = Computation.create () in - Fiber.spawn ~forbid:false computation [ Computation.capture computation main ]; + Fiber.spawn + (Fiber.create ~forbid:false computation) + (Computation.capture computation main); Computation.await computation let test_fls_basics = @@ -96,7 +98,7 @@ let test_thread_cancelation () = Fiber.yield () done in - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; result in Computation.cancel computation (Exn_bt.get_callstack 0 Exit) @@ -111,7 +113,7 @@ let test_cancel_after () = Fiber.yield () done in - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; Computation.cancel_after computation ~seconds:0.01 (Exn_bt.get_callstack 0 Not_found); Computation.await computation diff --git a/test/test_sync.ml b/test/test_sync.ml index 1a5ea5670..e92cecb5d 100644 --- a/test/test_sync.ml +++ b/test/test_sync.ml @@ -7,8 +7,9 @@ module Fiber = struct let start thunk = let computation = Computation.create () in - Fiber.spawn ~forbid:false computation - [ Computation.capture computation thunk ]; + Fiber.spawn + (Fiber.create ~forbid:false computation) + (Computation.capture computation thunk); computation end @@ -33,8 +34,9 @@ let test_mutex_and_condition_basics () = Condition.wait condition mutex); if 1 = Atomic.fetch_and_add n (-1) then Computation.finish test in - Fiber.spawn ~forbid:false computation - (List.init (Atomic.get n) @@ fun _ -> main); + for _ = 1 to Atomic.get n do + Fiber.spawn (Fiber.create ~forbid:false computation) main + done; while Computation.is_running test do Fiber.yield (); @@ -128,8 +130,9 @@ let test_mutex_and_condition_cancelation () = while Array.exists (fun step -> Atomic.get step < limit) steps do let finished = Trigger.create () in let checked = if Random.State.bool state then None else some_false in - Fiber.spawn ~forbid:false (Computation.create ()) - [ attempt i finished ?checked ]; + Fiber.spawn + (Fiber.create ~forbid:false (Computation.create ())) + (attempt i finished ?checked); Trigger.await finished |> ignore done in @@ -186,7 +189,7 @@ let test_lazy_cancelation () = in let computation = Computation.create () in Computation.cancel computation (Exn_bt.get_callstack 0 Exit); - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; while not (Atomic.get tried) do Fiber.yield () done;