Skip to content

Add more structured Run operations #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench/bench_run.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let run_suite ~budgetf:_ = []
36 changes: 36 additions & 0 deletions bench/bench_run.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
open Multicore_bench
open Picos_std_structured
module Multififos = Picos_mux_multififo

let run_one_multififo ~budgetf ~n_domains ~n () =
let context = ref (Obj.magic ()) in

let m = if n < 1_000_000 then 1_000_000 / n else 1 in

let before _ = context := Multififos.context () in
let init _ = !context in
let wrap i context action =
if i = 0 then Multififos.run ~context action else action ()
in
let work i context =
if i <> 0 then Multififos.runner_on_this_thread context
else
for _ = 1 to m do
Run.for_n n ignore
done
in

let config =
Printf.sprintf "%d mfifo%s, run_n %d" n_domains
(if n_domains = 1 then "" else "s")
n
in
Times.record ~budgetf ~n_domains ~before ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:(n * m) ~singular:"ignore" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ]
[ 100; 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ]
|> List.concat_map @@ fun (n_domains, n) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one_multififo ~budgetf ~n_domains ~n ()
6 changes: 6 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
(run %{test} -brief "Picos binaries")
(run %{test} -brief "Bounded_q with Picos_std_sync")
(run %{test} -brief "Memory usage")
(run %{test} -brief "Picos_std_structured.Run")
;;
))
(foreign_stubs
Expand All @@ -49,6 +50,11 @@
from
(picos_mux.fifo -> scheduler.ocaml5.ml)
(picos_mux.thread -> scheduler.ocaml4.ml))
(select
bench_run.ml
from
(picos_mux.multififo -> bench_run.ocaml5.ml)
(-> bench_run.ocaml4.ml))
(select
bench_fib.ml
from
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let benchmarks =
("Picos binaries", Bench_binaries.run_suite);
("Bounded_q with Picos_std_sync", Bench_bounded_q.run_suite);
("Memory usage", Bench_memory.run_suite);
("Picos_std_structured.Run", Bench_run.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
76 changes: 48 additions & 28 deletions lib/picos_std.structured/bundle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ type t = [ `Bundle ] tdt

external config_as_atomic : t -> int Atomic.t = "%identity"

let config_terminated_bit = 0x01
and config_callstack_mask = 0x3E
and config_callstack_shift = 1
and config_one = 0x40 (* memory runs out before overflow *)
let config_on_return_terminate_bit = 0x01
and config_on_terminate_raise_bit = 0x02
and config_callstack_mask = 0x6C
and config_callstack_shift = 2
and config_one = 0x80 (* memory runs out before overflow *)

let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()

let terminate_as callstack (Bundle { bundle = Packed bundle; _ } : t) =
Computation.cancel bundle Control.Terminate callstack

let terminate ?callstack t =
terminate_as (Control.get_callstack_opt callstack) t
let terminate ?callstack (Bundle { bundle = Packed bundle; _ } : t) =
Computation.cancel bundle Control.Terminate
(Control.get_callstack_opt callstack)

let terminate_after ?callstack (Bundle { bundle = Packed bundle; _ } : t)
~seconds =
Expand All @@ -39,25 +38,33 @@ let error ?callstack (Bundle r as t : t) exn bt =
terminate ?callstack t;
Control.Errors.push r.errors exn bt
end
else if
Atomic.get (config_as_atomic t) land config_on_terminate_raise_bit <> 0
then terminate ?callstack t

let decr (Bundle r as t : t) =
let n = Atomic.fetch_and_add (config_as_atomic t) (-config_one) in
if n < config_one * 2 then begin
terminate_as Control.empty_bt t;
Trigger.signal r.finished
end

type _ pass = FLS : unit pass | Arg : t pass

let[@inline never] no_flock () = invalid_arg "no flock"

let[@inline] on_terminate = function
| None | Some `Ignore -> `Ignore
| Some `Raise -> `Raise

let get_flock fiber =
match Fiber.FLS.get fiber flock_key ~default:Nothing with
| Bundle _ as t -> t
| Nothing -> no_flock ()

let await (Bundle r as t : t) fiber packed canceler outer =
Fiber.set_computation fiber packed;
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
Fiber.FLS.set fiber flock_key outer;
let forbid = Fiber.exchange fiber ~forbid:true in
let n = Atomic.fetch_and_add (config_as_atomic t) (-config_one) in
if config_one * 2 <= n then begin
Expand All @@ -66,14 +73,22 @@ let await (Bundle r as t : t) fiber packed canceler outer =
write from being delayed after the [Trigger.await] below. *)
if config_one <= Atomic.fetch_and_add (config_as_atomic t) 0 then
Trigger.await r.finished |> ignore
end
else terminate_as Control.empty_bt t;
end;
Fiber.set fiber ~forbid;
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
Fiber.FLS.set fiber flock_key outer;
let (Packed parent) = packed in
Computation.detach parent canceler;
Control.Errors.check r.errors;
begin
let (Packed bundle) = r.bundle in
match Computation.peek_exn bundle with
| _ -> ()
| exception Computation.Running ->
Computation.cancel bundle Control.Terminate Control.empty_bt
| exception Control.Terminate
when Atomic.get (config_as_atomic t) land config_on_terminate_raise_bit
= 0 ->
()
end;
Fiber.check fiber

let[@inline never] raised exn t fiber packed canceler outer =
Expand All @@ -84,7 +99,7 @@ let[@inline never] raised exn t fiber packed canceler outer =

let[@inline never] returned value (t : t) fiber packed canceler outer =
let config = Atomic.get (config_as_atomic t) in
if config land config_terminated_bit <> 0 then begin
if config land config_on_return_terminate_bit <> 0 then begin
let callstack =
let n = (config land config_callstack_mask) lsr config_callstack_shift in
if n = 0 then None else Some n
Expand All @@ -99,25 +114,30 @@ let join_after_realloc x fn t fiber packed canceler outer =
| value -> returned value t fiber packed canceler outer
| exception exn -> raised exn t fiber packed canceler outer

let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
=
let join_after_pass (type a) ?callstack ?on_return ?on_terminate (fn : a -> _)
(pass : a pass) =
(* The sequence of operations below ensures that nothing is leaked. *)
let (Bundle r as t : t) =
let terminated =
let config =
match on_return with
| None | Some `Wait -> 0
| Some `Terminate -> config_terminated_bit
| None | Some `Wait -> config_one
| Some `Terminate -> config_one lor config_on_return_terminate_bit
in
let callstack =
let config =
match on_terminate with
| None | Some `Ignore -> config
| Some `Raise -> config lor config_on_terminate_raise_bit
in
let config =
match callstack with
| None -> 0
| None -> config
| Some n ->
if n <= 0 then 0
if n <= 0 then config
else
Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
config
lor Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
in
let config = config_one lor callstack lor terminated in
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
let errors = Control.Errors.create () in
let finished = Trigger.signaled in
Expand Down Expand Up @@ -219,8 +239,8 @@ let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
let is_running (Bundle { bundle = Packed bundle; _ } : t) =
Computation.is_running bundle

let join_after ?callstack ?on_return fn =
join_after_pass ?callstack ?on_return fn Arg
let join_after ?callstack ?on_return ?on_terminate fn =
join_after_pass ?callstack ?on_return ?on_terminate fn Arg

let fork t thunk = fork_pass t thunk Arg
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg
Expand Down
16 changes: 9 additions & 7 deletions lib/picos_std.structured/control.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ module Errors = struct
| [ (exn, bt) ] -> Printexc.raise_with_backtrace exn bt
| exn_bts -> check exn_bts []

let rec push t exn bt backoff =
let before = Atomic.get t in
let after = (exn, bt) :: before in
if not (Atomic.compare_and_set t before after) then
push t exn bt (Backoff.once backoff)

let push t exn bt = push t exn bt Backoff.default
let push t exn bt =
let backoff = ref Backoff.default in
while
let before = Atomic.get t in
let after = (exn, bt) :: before in
not (Atomic.compare_and_set t before after)
do
backoff := Backoff.once !backoff
done
end

let raise_if_canceled () = Fiber.check (Fiber.current ())
Expand Down
12 changes: 12 additions & 0 deletions lib/picos_std.structured/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
(rule
(enabled_if
(<= 5.0.0 %{ocaml_version}))
(action
(copy for.ocaml5.ml for.ml)))

(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
(action
(copy for.ocaml4.ml for.ml)))

(library
(name picos_std_structured)
(public_name picos_std.structured)
Expand Down
4 changes: 2 additions & 2 deletions lib/picos_std.structured/flock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ let error ?callstack exn_bt = Bundle.error (get ()) ?callstack exn_bt
let fork_as_promise thunk = Bundle.fork_as_promise_pass (get ()) thunk FLS
let fork action = Bundle.fork_pass (get ()) action FLS

let join_after ?callstack ?on_return fn =
Bundle.join_after_pass ?callstack ?on_return fn Bundle.FLS
let join_after ?callstack ?on_return ?on_terminate fn =
Bundle.join_after_pass ?callstack ?on_return ?on_terminate fn Bundle.FLS
57 changes: 57 additions & 0 deletions lib/picos_std.structured/for.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
type _ tdt =
| Empty : [> `Empty ] tdt
| Range : {
mutable lo : int;
hi : int;
parent : [ `Empty | `Range ] tdt;
}
-> [> `Range ] tdt

let[@poll error] cas_lo (Range r : [ `Range ] tdt) before after =
r.lo == before
&& begin
r.lo <- after;
true
end

let rec for_out t (Range r as range : [ `Range ] tdt) action =
let lo_before = r.lo in
let n = r.hi - lo_before in
if 0 < n then begin
if Bundle.is_running t then begin
let lo_after = lo_before + 1 in
if cas_lo range lo_before lo_after then begin
try action lo_before
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
end;
for_out t range action
end
end
else
match r.parent with
| Empty -> ()
| Range _ as range -> for_out t range action

let rec for_in t (Range r as range : [ `Range ] tdt) action =
let lo_before = r.lo in
let n = r.hi - lo_before in
if n <= 1 then for_out t range action
else
let lo_after = lo_before + (n asr 1) in
if cas_lo range lo_before lo_after then begin
Bundle.fork t (fun () -> for_in t range action);
let child = Range { lo = lo_before; hi = lo_after; parent = range } in
for_in t child action
end
else for_in t range action

let for_n ?on_terminate n action =
if 0 < n then
if n = 1 then
try action 0
with
| Control.Terminate when Bundle.on_terminate on_terminate == `Ignore ->
()
else
let range = Range { lo = 0; hi = n; parent = Empty } in
Bundle.join_after ?on_terminate @@ fun t -> for_in t range action
Loading