diff --git a/lib_eio/pool.ml b/lib_eio/pool.ml index be5afd61e..4c8781e27 100644 --- a/lib_eio/pool.ml +++ b/lib_eio/pool.ml @@ -19,7 +19,7 @@ type 'a slot = 'a option ref module Cell = struct (* The possible behaviours are: - 1. Suspender : In_transition -> Request Suspender waits for a resource + 1. Suspender : In_transition -> Request Suspender waits for a resource 1.1. Resumer : Request -> Finished Resumer then providers a resource 1.2. Suspender : Request -> Finished Suspender cancels 2. Resumer : In_transition -> Resource Resumer provides a spare resource @@ -89,11 +89,10 @@ let cancel segment cell = | In_transition | Resource _ -> assert false (* Can't get here from [Request]. *) (* If [t] is under capacity, add another (empty) slot. *) -let rec maybe_add_slot t = - let current = Atomic.get t.slots in +let rec maybe_add_slot t current = if current < t.max_slots then ( if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None) - else maybe_add_slot t (* Concurrent update; try again *) + else maybe_add_slot t (Atomic.get t.slots) (* Concurrent update; try again *) ) (* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it. @@ -114,7 +113,7 @@ let run_with t f slot = f x end with - | r -> + | r -> add t slot; r | exception ex -> @@ -122,17 +121,36 @@ let run_with t f slot = add t slot; Printexc.raise_with_backtrace ex bt -let use t f = +(* Creates a fresh resource [x], runs [f x], then disposes of [x] *) +let run_new_and_dispose t f = + let x = t.alloc () in + match f x with + | r -> + t.dispose x; + r + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + t.dispose x; + Printexc.raise_with_backtrace ex bt + +let use t ?(never_block=false) f = let segment, cell = Q.next_suspend t.q in match Atomic.get cell with | Finished | Request _ -> assert false | Resource slot -> Atomic.set cell Finished; (* Allow value to be GC'd *) run_with t f slot - | In_transition -> - (* It would have been better if more resources were available. - If we still have capacity, add a new slot now. *) - maybe_add_slot t; + | In_transition -> ( + let current = Atomic.get t.slots in + match current < t.max_slots with + | false when never_block -> + (* We are at capacity, but cannot block. + Create a new resource to run f but don't add it to the pool. *) + Atomic.set cell Finished; + run_new_and_dispose t f + | can_add -> + (* Create a slot if not at capacity. *) + if can_add then maybe_add_slot t current; (* No item is available right now. Start waiting *) let slot = Suspend.enter_unchecked (fun ctx enqueue -> @@ -156,3 +174,4 @@ let use t f = in (* assert (Atomic.get cell = Finished); *) run_with t f slot + ) \ No newline at end of file diff --git a/lib_eio/pool.mli b/lib_eio/pool.mli index d299b714e..5437134d5 100644 --- a/lib_eio/pool.mli +++ b/lib_eio/pool.mli @@ -38,6 +38,12 @@ val create : If it raises, the exception is passed on to the user, but resource is still considered to have been disposed. *) -val use : 'a t -> ('a -> 'b) -> 'b +val use : 'a t -> ?never_block:bool -> ('a -> 'b) -> 'b (** [use t fn] waits for some resource [x] to be available and then runs [f x]. - Afterwards (on success or error), [x] is returned to the pool. *) + Afterwards (on success or error), [x] is returned to the pool. + + @param never_block If [true] and the pool has reached maximum capacity, + then a fresh resource is created to ensure that this [use] + call does not wait for a resource to become available. + This resource is immediately disposed after [f x] returns. + *) diff --git a/tests/pool.md b/tests/pool.md index b391b50d0..ce547c131 100644 --- a/tests/pool.md +++ b/tests/pool.md @@ -77,6 +77,33 @@ Two uses with a capacity of 2; they run in parallel: - : unit = () ``` +Capacity of 1; two uses that cannot block and two normal uses; first 2 are parallel, next 2 are sequential: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let p0, r0 = Promise.create () in + let p1, r1 = Promise.create () in + let t = create 1 [p0; p1] in + Fiber.all [ + (fun () -> P.use t ~never_block:true (fun x -> traceln "A: using item %d" x; Fiber.yield (); traceln "A done")); + (fun () -> P.use t ~never_block:true (fun x -> traceln "B: using item %d" x; Fiber.yield (); traceln "B done")); + (fun () -> P.use t (fun x -> traceln "C: using item %d" x; Fiber.yield (); traceln "C done")); + (fun () -> P.use t (fun x -> traceln "D: using item %d" x; Fiber.yield (); traceln "D done")); + (fun () -> Promise.resolve r0 (Ok 0); Promise.resolve r1 (Ok 1)); + ]; ++Creating item 0 ++Creating item 1 ++A: using item 1 ++B: using item 0 ++A done ++B done ++C: using item 1 ++C done ++D: using item 1 ++D done +- : unit = () +``` + ## Cancellation ```ocaml