Skip to content

Commit

Permalink
Redesign Spawn and FLS
Browse files Browse the repository at this point in the history
This PR redesigns the `Spawn` and `FLS` mechanism to better support new use
cases.
  • Loading branch information
polytypic committed Jul 22, 2024
1 parent 29502de commit bcc6ff3
Show file tree
Hide file tree
Showing 24 changed files with 546 additions and 243 deletions.
2 changes: 1 addition & 1 deletion bench/bench_fib.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let rec exp_fib i =
else
let computation = Computation.create () in
let main () = Computation.return computation (exp_fib (i - 2)) in
Fiber.spawn ~forbid:false computation [ main ];
Fiber.spawn (Fiber.create ~forbid:false computation) main;
let f1 = exp_fib (i - 1) in
let f2 = Computation.await computation in
f1 + f2
Expand Down
28 changes: 13 additions & 15 deletions bench/bench_spawn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,31 @@ let factor =
Util.iter_factor
* if String.starts_with ~prefix:"4." Sys.ocaml_version then 1 else 10

let run_one ~budgetf ~at_a_time () =
let run_one ~budgetf () =
let n_spawns = 10 * factor in

let init _ = () in
let wrap _ () = Scheduler.run in
let work _ () =
let counter = ref n_spawns in
let computation = Computation.create () in
for _ = 1 to n_spawns / at_a_time do
let main () =
let n = !counter - 1 in
counter := n;
if n = 0 then Computation.finish computation
in
let mains = List.init at_a_time @@ fun _ -> main in
Fiber.spawn ~forbid:false computation mains
let computation_packed = Computation.Packed computation in
let main () =
let n = !counter - 1 in
counter := n;
if n = 0 then Computation.finish computation;
Fiber.finalize (Fiber.current ())
in
for _ = 1 to n_spawns do
let fiber = Fiber.create_packed ~forbid:false computation_packed in
Fiber.spawn fiber main
done;
Computation.await computation
in

let config = Printf.sprintf "%d at a time" at_a_time in
let config = "with packed computation" in
Times.record ~budgetf ~n_domains:1 ~n_warmups:1 ~n_runs_min:1 ~init ~wrap
~work ()
|> Times.to_thruput_metrics ~n:n_spawns ~singular:"spawn" ~config

let run_suite ~budgetf =
if Sys.int_size <= 32 then []
else
[ 1; 2; 4; 8 ]
|> List.concat_map @@ fun at_a_time -> run_one ~budgetf ~at_a_time ()
let run_suite ~budgetf = if Sys.int_size <= 32 then [] else run_one ~budgetf ()
135 changes: 110 additions & 25 deletions lib/picos/bootstrap/picos_bootstrap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -219,32 +219,35 @@ end
module Fiber = struct
type non_float = [ `Non_float of non_float ]

let forbid_bit = 0b01

type _ tdt =
| Nothing : [> `Nothing ] tdt
| Fiber : {
mutable forbid : bool;
mutable flags : int;
mutable packed : Computation.packed;
mutable fls : non_float array;
}
-> [> `Fiber ] tdt

type t = [ `Fiber ] tdt

let create_packed ~forbid packed = Fiber { forbid; packed; fls = [||] }
let create_packed ~forbid packed =
Fiber { flags = Bool.to_int forbid; packed; fls = [||] }

let create ~forbid computation =
create_packed ~forbid (Computation.Packed computation)

let has_forbidden (Fiber r : t) = r.forbid
let has_forbidden (Fiber r : t) = r.flags land forbid_bit <> 0

let is_canceled (Fiber r : t) =
(not r.forbid)
r.flags land forbid_bit = 0
&&
let (Packed computation) = r.packed in
Computation.is_canceled computation

let canceled (Fiber r : t) =
if r.forbid then None
if r.flags land forbid_bit <> 0 then None
else
let (Packed computation) = r.packed in
Computation.canceled computation
Expand All @@ -253,36 +256,38 @@ module Fiber = struct
let set_computation (Fiber r : t) packed = r.packed <- packed

let check (Fiber r : t) =
if not r.forbid then
if r.flags land forbid_bit = 0 then
let (Packed computation) = r.packed in
Computation.check computation

let[@inline] equal t1 t2 = t1 == t2

let exchange (Fiber r : t) ~forbid =
let before = r.forbid in
r.forbid <- forbid;
before
let flags = r.flags in
r.flags <- flags land lnot forbid_bit lor Bool.to_int forbid;
flags land forbid_bit <> 0

let set (Fiber r : t) ~forbid = r.forbid <- forbid
let set (Fiber r : t) ~forbid =
r.flags <- r.flags land lnot forbid_bit lor Bool.to_int forbid

let explicitly (Fiber r : t) body ~forbid =
if r.forbid = forbid then body ()
let explicitly (Fiber r as t : t) body ~forbid =
let flags = r.flags in
if flags land forbid_bit = Bool.to_int forbid then body ()
else
match body (r.forbid <- forbid) with
match body (r.flags <- flags lxor forbid_bit) with
| value ->
r.forbid <- not forbid;
set t ~forbid:(not forbid);
value
| exception exn ->
r.forbid <- not forbid;
set t ~forbid:(not forbid);
raise exn

let forbid t body = explicitly t body ~forbid:true
let permit t body = explicitly t body ~forbid:false

let try_suspend (Fiber r : t) trigger x y resume =
let (Packed computation) = r.packed in
if not r.forbid then begin
if r.flags land forbid_bit = 0 then begin
if Computation.try_attach computation trigger then
Trigger.on_signal trigger x y resume
|| begin
Expand All @@ -299,7 +304,7 @@ module Fiber = struct

let[@inline] unsuspend (Fiber r : t) trigger =
assert (Trigger.is_signaled trigger);
r.forbid
r.flags land forbid_bit <> 0
||
let (Packed computation) = r.packed in
Computation.unsafe_unsuspend computation Backoff.default
Expand Down Expand Up @@ -331,13 +336,60 @@ module Fiber = struct

type 'a initial = Constant of 'a | Computed of (unit -> 'a)

let new_key initial =
type finalizers =
| Nil
| Finalizer : {
index : int;
finalize : 'a -> unit;
next : finalizers;
}
-> finalizers

let finalizers = Atomic.make Nil

let rec add_finalizer index finalize =
let before = Atomic.get finalizers in
let after = Finalizer { index; finalize; next = before } in
if not (Atomic.compare_and_set finalizers before after) then
add_finalizer index finalize

type initializers =
| Nil
| Initializer : {
key : 'a key;
initialize : t -> 'a;
next : initializers;
}
-> initializers

let initializers = Atomic.make Nil

let rec add_initializer key initialize =
let before = Atomic.get initializers in
let after = Initializer { key; initialize; next = before } in
if not (Atomic.compare_and_set initializers before after) then
add_initializer key initialize

let new_key ?finalize ?initialize initial =
let index = Atomic.fetch_and_add counter 1 in
match initial with
| Constant default ->
let default = Sys.opaque_identity (Obj.magic default : non_float) in
{ index; default; compute }
| Computed compute -> { index; default = unique; compute }
begin
match finalize with
| None -> ()
| Some finalize -> add_finalizer index finalize
end;
let key =
match initial with
| Constant default ->
let default = Sys.opaque_identity (Obj.magic default : non_float) in
{ index; default; compute }
| Computed compute -> { index; default = unique; compute }
in
begin
match initialize with
| None -> ()
| Some initialize -> add_initializer key initialize
end;
key

let get (type a) (Fiber r : t) (key : a key) =
let fls = r.fls in
Expand Down Expand Up @@ -369,6 +421,13 @@ module Fiber = struct
(Sys.opaque_identity (Obj.magic value : non_float));
value

let[@inline] has (Fiber r : t) key =
let fls = r.fls in
key.index < Array.length fls
&&
let value = Array.unsafe_get fls key.index in
value != unique

let set (type a) (Fiber r : t) (key : a key) (value : a) =
let fls = r.fls in
if key.index < Array.length fls then
Expand All @@ -379,14 +438,40 @@ module Fiber = struct
r.fls <- fls;
Array.unsafe_set fls key.index
(Sys.opaque_identity (Obj.magic value : non_float))

let rec finalize fls = function
| Finalizer r ->
if r.index < Array.length fls then begin
let value = Array.unsafe_get fls r.index in
if value != unique then r.finalize (Obj.magic value)
end;
finalize fls r.next
| Nil -> ()

let rec initialize ~parent ~child = function
| Initializer r ->
if not (has child r.key) then set child r.key (r.initialize parent);
initialize ~parent ~child r.next
| Nil -> ()
end

let[@inline] finalize (Fiber t : t) =
let fls = t.fls in
if 0 < Array.length fls then FLS.finalize fls (Atomic.get FLS.finalizers)

let[@inline] initialize ~parent ~child =
match Atomic.get FLS.initializers with
| Initializer r ->
if not (FLS.has child r.key) then
FLS.set child r.key (r.initialize parent);
FLS.initialize ~parent ~child r.next
| Nil -> ()
end

module Handler = struct
type 'c t = {
current : 'c -> Fiber.t;
spawn :
'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit;
spawn : 'c -> Fiber.t -> (unit -> unit) -> unit;
yield : 'c -> unit;
cancel_after :
'a. 'c -> 'a Computation.t -> seconds:float -> Exn_bt.t -> unit;
Expand Down
17 changes: 6 additions & 11 deletions lib/picos/intf.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,19 @@ module type Fiber = sig
associated with the fiber has been canceled the scheduler is free to
discontinue the fiber immediately before spawning new fibers.
The scheduler is free to run the newly created fibers on any domain and
The scheduler is free to run the newly created fiber on any domain and
decide which fiber to give priority to.
⚠️ The scheduler should guarantee that, when [Spawn] returns normally, all
of the [mains] will eventually be called by the scheduler and, when
[Spawn] raises an exception, none of the [mains] will be called. In other
words, [Spawn] should check cancelation just once and be all or nothing.
⚠️ The scheduler should guarantee that, when [Spawn] returns normally, the
given [main] will eventually be called by the scheduler and, when [Spawn]
raises an exception, the [main] will not be called. In other words,
[Spawn] should check cancelation just once and be all or nothing.
Furthermore, in case a newly spawned fiber is canceled before its main is
called, the scheduler must still call the main. This allows a program to
ensure, i.e. keep track of, that all fibers it spawns are terminated
properly and any resources transmitted to spawned fibers will be disposed
properly. *)
type _ Effect.t +=
private
| Spawn : {
forbid : bool;
computation : 'a computation;
mains : (unit -> unit) list;
}
-> unit Effect.t
| Spawn : { fiber : t; main : unit -> unit } -> unit Effect.t
end
16 changes: 8 additions & 8 deletions lib/picos/ocaml4/picos_ocaml.ml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
open Picos_bootstrap

let error () =
let[@inline never] error _ =
raise (Sys_error "Picos.Handler.using not called for current thread")

module Handler = struct
type entry = E : { context : 'a; handler : 'a Handler.t } -> entry

let default =
let current _ = error ()
and spawn _ ~forbid:_ _ _ = error ()
and yield _ = error ()
and cancel_after _ _ ~seconds:_ _ = error ()
and await _ _ = error () in
let current = error
and spawn _ _ = error
and yield = error
and cancel_after _ _ ~seconds:_ = error
and await _ = error in
E { context = (); handler = { current; spawn; yield; cancel_after; await } }

let key = Picos_thread.TLS.new_key @@ fun () -> default
Expand Down Expand Up @@ -42,9 +42,9 @@ module Fiber = struct
let (E r) = Handler.get () in
r.handler.current r.context

let spawn ~forbid computation mains =
let spawn fiber main =
let (E r) = Handler.get () in
r.handler.spawn r.context ~forbid computation mains
r.handler.spawn r.context fiber main

let yield () =
let (E r) = Handler.get () in
Expand Down
12 changes: 3 additions & 9 deletions lib/picos/ocaml5/picos_ocaml.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@ module Fiber = struct
let current () = Effect.perform Current

type _ Effect.t +=
| Spawn : {
forbid : bool;
computation : 'a Computation.t;
mains : (unit -> unit) list;
}
-> unit Effect.t
| Spawn : { fiber : Fiber.t; main : unit -> unit } -> unit Effect.t

let spawn ~forbid computation mains =
Effect.perform @@ Spawn { forbid; computation; mains }
let spawn fiber main = Effect.perform @@ Spawn { fiber; main }

type _ Effect.t += Yield : unit Effect.t

Expand Down Expand Up @@ -78,7 +72,7 @@ module Handler = struct
| Fiber.Spawn r ->
Some
(fun k ->
match h.spawn c ~forbid:r.forbid r.computation r.mains with
match h.spawn c r.fiber r.main with
| unit -> Effect.Deep.continue k unit
| exception exn -> discontinue k exn)
| Fiber.Yield -> yield
Expand Down
Loading

0 comments on commit bcc6ff3

Please sign in to comment.