Skip to content

Commit

Permalink
And more tweaks to improve code size and performance
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Oct 15, 2024
1 parent 2c5c800 commit 7692f14
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 268 deletions.
6 changes: 3 additions & 3 deletions lib/picos/picos.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ module Computation = struct
let trigger = Trigger.from_action x y action in
Atomic.make (S (Continue { balance_and_mode; triggers = [ trigger ] }))

let is_canceled t =
let[@inline] is_canceled t =
match Atomic.get t with
| S (Canceled { tx; _ }) -> tx == Stopped
| S (Returned _) | S (Continue _) -> false
Expand Down Expand Up @@ -460,13 +460,13 @@ module Fiber = struct

let has_forbidden (Fiber r : t) = r.forbid

let is_canceled (Fiber r : t) =
let[@inline] is_canceled (Fiber r : t) =
(not r.forbid)
&&
let (Packed computation) = r.packed in
Computation.is_canceled computation

let canceled (Fiber r : t) =
let[@inline] canceled (Fiber r : t) =
if r.forbid then None
else
let (Packed computation) = r.packed in
Expand Down
213 changes: 110 additions & 103 deletions lib/picos_mux.fifo/picos_mux_fifo.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
open Picos

let[@inline never] quota_non_positive () = invalid_arg "quota must be positive"
let[@inline never] quota_non_positive _ = invalid_arg "quota must be positive"

type ready =
| Spawn of Fiber.t * (Fiber.t -> unit)
Expand All @@ -17,16 +17,16 @@ type t = {
needs_wakeup : bool Atomic.t;
mutex : Mutex.t;
condition : Condition.t;
resume :
mutable resume :
Trigger.t ->
Fiber.t ->
((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation ->
unit;
current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option;
yield : ((unit, unit) Effect.Deep.continuation -> unit) option;
return : ((unit, unit) Effect.Deep.continuation -> unit) option;
discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option;
handler : (unit, unit) Effect.Deep.handler;
mutable current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option;
mutable yield : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable handler : (unit, unit) Effect.Deep.handler;
quota : int;
mutable fiber : Fiber.Maybe.t;
mutable remaining_quota : int;
Expand Down Expand Up @@ -67,48 +67,112 @@ let rec next t =
next t
end

let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
let quota =
match quota with
| None -> Int.max_int
| Some quota ->
if quota <= 0 then quota_non_positive ();
quota
in
let run_fiber ?quota ?fatal_exn_handler fiber main =
Select.check_configured ();
let ready = Mpscq.create ~padded:true ()
and needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded
and mutex = Mutex.create ()
and condition = Condition.create () in
let rec t =
let t =
let quota =
match quota with
| None -> Int.max_int
| Some quota -> if quota <= 0 then quota_non_positive quota else quota
in
{
ready;
fiber = Fiber.Maybe.of_fiber fiber;
needs_wakeup;
mutex;
condition;
resume;
current;
yield;
return;
discontinue;
handler;
ready = Mpscq.create ~padded:true ();
needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded;
mutex = Mutex.create ();
condition = Condition.create ();
resume = Obj.magic ();
current = Obj.magic ();
yield = Obj.magic ();
return = Obj.magic ();
discontinue = Obj.magic ();
handler = Obj.magic ();
quota;
fiber = Fiber.Maybe.of_fiber fiber;
remaining_quota = quota;
num_alive_fibers = 1;
}
and current =
in
t.handler <-
{
exnc = (match fatal_exn_handler with None -> raise | Some exnc -> exnc);
effc =
(fun (type a) (e : a Effect.t) :
((a, _) Effect.Deep.continuation -> _) option ->
match e with
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
if Fiber.is_canceled fiber then t.discontinue
else begin
t.num_alive_fibers <- t.num_alive_fibers + 1;
Mpscq.push t.ready (Spawn (r.fiber, r.main));
t.return
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let fiber = Fiber.Maybe.to_fiber t.fiber in
if Fiber.is_canceled fiber then t.discontinue
else
match
Select.cancel_after r.computation ~seconds:r.seconds r.exn
r.bt
with
| () -> t.return
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
Some
(fun k -> Effect.Deep.discontinue_with_backtrace k exn bt)
end
| Trigger.Await trigger ->
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
if Fiber.try_suspend fiber trigger fiber k t.resume then
next t
else
let remaining_quota = t.remaining_quota - 1 in
if 0 < remaining_quota then begin
t.remaining_quota <- remaining_quota;
Fiber.resume fiber k
end
else begin
Mpscq.push t.ready (Resume (fiber, k));
next t
end)
| _ -> None);
retc =
(fun () ->
t.num_alive_fibers <- t.num_alive_fibers - 1;
next t);
};
t.resume <-
(fun trigger fiber k ->
let resume = Resume (fiber, k) in
if Fiber.unsuspend fiber trigger then Mpscq.push t.ready resume
else Mpscq.push_head t.ready resume;
if
Atomic.get t.needs_wakeup
&& Atomic.compare_and_set t.needs_wakeup true false
then begin
begin
match Mutex.lock t.mutex with
| () -> Mutex.unlock t.mutex
| exception Sys_error _ -> ()
end;
Condition.broadcast t.condition
end);
t.current <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
Effect.Deep.continue k fiber)
and yield =
Effect.Deep.continue k fiber);
t.yield <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
Mpscq.push t.ready (Continue (fiber, k));
next t)
and return =
next t);
t.return <-
Some
(fun k ->
let remaining_quota = t.remaining_quota - 1 in
Expand All @@ -119,78 +183,21 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
else begin
Mpscq.push t.ready (Return (Fiber.Maybe.to_fiber t.fiber, k));
next t
end)
and discontinue =
end);
t.discontinue <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
Fiber.continue fiber k ())
and handler = { retc; exnc; effc }
and[@alert "-handler"] effc :
type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option =
function
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
if Fiber.is_canceled fiber then t.discontinue
else begin
t.num_alive_fibers <- t.num_alive_fibers + 1;
Mpscq.push t.ready (Spawn (r.fiber, r.main));
t.return
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let fiber = Fiber.Maybe.to_fiber t.fiber in
if Fiber.is_canceled fiber then t.discontinue
else
match
Select.cancel_after r.computation ~seconds:r.seconds r.exn r.bt
with
| () -> t.return
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
Some (fun k -> Effect.Deep.discontinue_with_backtrace k exn bt)
end
| Trigger.Await trigger ->
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
else
let remaining_quota = t.remaining_quota - 1 in
if 0 < remaining_quota then begin
t.remaining_quota <- remaining_quota;
Fiber.resume fiber k
end
else begin
Mpscq.push t.ready (Resume (fiber, k));
next t
end)
| _ -> None
and retc () =
t.num_alive_fibers <- t.num_alive_fibers - 1;
next t
and resume trigger fiber k =
let resume = Resume (fiber, k) in
if Fiber.unsuspend fiber trigger then Mpscq.push t.ready resume
else Mpscq.push_head t.ready resume;
if
Atomic.get t.needs_wakeup
&& Atomic.compare_and_set t.needs_wakeup true false
then begin
begin
match Mutex.lock t.mutex with
| () -> Mutex.unlock t.mutex
| exception Sys_error _ -> ()
end;
Condition.broadcast t.condition
end
in
Fiber.continue fiber k ());
Effect.Deep.match_with main fiber t.handler

let run ?quota ?fatal_exn_handler ?(forbid = false) main =
let[@inline never] run ?quota ?fatal_exn_handler fiber main computation =
run_fiber ?quota ?fatal_exn_handler fiber main;
Computation.peek_exn computation

let run ?quota ?fatal_exn_handler ?forbid main =
let forbid = match forbid with None -> false | Some forbid -> forbid in
let computation = Computation.create ~mode:`LIFO () in
let fiber = Fiber.create ~forbid computation in
let main _ = Computation.capture computation main () in
run_fiber ?quota ?fatal_exn_handler fiber main;
Computation.peek_exn computation
run ?quota ?fatal_exn_handler fiber main computation
Loading

0 comments on commit 7692f14

Please sign in to comment.