Skip to content

Commit

Permalink
Add Eio.Pool.use ~never_block
Browse files Browse the repository at this point in the history
  • Loading branch information
SGrondin committed Dec 16, 2023
1 parent a26c3eb commit fcda90c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 12 deletions.
30 changes: 20 additions & 10 deletions lib_eio/pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type 'a slot = 'a option ref
module Cell = struct
(* The possible behaviours are:
1. Suspender : In_transition -> Request Suspender waits for a resource
1. Suspender : In_transition -> Request Suspender waits for a resource
1.1. Resumer : Request -> Finished Resumer then providers a resource
1.2. Suspender : Request -> Finished Suspender cancels
2. Resumer : In_transition -> Resource Resumer provides a spare resource
Expand Down Expand Up @@ -89,11 +89,10 @@ let cancel segment cell =
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)

(* If [t] is under capacity, add another (empty) slot. *)
let rec maybe_add_slot t =
let current = Atomic.get t.slots in
let rec maybe_add_slot t current =
if current < t.max_slots then (
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
else maybe_add_slot t (* Concurrent update; try again *)
else maybe_add_slot t (Atomic.get t.slots) (* Concurrent update; try again *)
)

(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
Expand All @@ -114,25 +113,35 @@ let run_with t f slot =
f x
end
with
| r ->
| r ->
add t slot;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
add t slot;
Printexc.raise_with_backtrace ex bt

let use t f =
let use t ?(never_block=false) f =
let segment, cell = Q.next_suspend t.q in
match Atomic.get cell with
| Finished | Request _ -> assert false
| Resource slot ->
Atomic.set cell Finished; (* Allow value to be GC'd *)
run_with t f slot
| In_transition ->
(* It would have been better if more resources were available.
If we still have capacity, add a new slot now. *)
maybe_add_slot t;
| In_transition -> (
let current = Atomic.get t.slots in
match current < t.max_slots with
| false when never_block ->
(* We are at capacity, but cannot block.
Create a new resource to run f but don't add it to the pool. *)
Atomic.set cell Finished;
let x = t.alloc () in
Fun.protect
(fun () -> f x)
~finally:(fun () -> t.dispose x)
| can_add ->
(* Create a slot if not at capacity. *)
if can_add then maybe_add_slot t current;
(* No item is available right now. Start waiting *)
let slot =
Suspend.enter_unchecked (fun ctx enqueue ->
Expand All @@ -156,3 +165,4 @@ let use t f =
in
(* assert (Atomic.get cell = Finished); *)
run_with t f slot
)
10 changes: 8 additions & 2 deletions lib_eio/pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ val create :
If it raises, the exception is passed on to the user,
but resource is still considered to have been disposed. *)

val use : 'a t -> ('a -> 'b) -> 'b
val use : 'a t -> ?never_block:bool -> ('a -> 'b) -> 'b
(** [use t fn] waits for some resource [x] to be available and then runs [f x].
Afterwards (on success or error), [x] is returned to the pool. *)
Afterwards (on success or error), [x] is returned to the pool.
@param never_block If [true] and the pool has reached maximum capacity,
then a fresh resource is created to ensure that this [use]
call does not wait for a resource to become available.
This resource is immediately disposed after [f x] returns.
*)
27 changes: 27 additions & 0 deletions tests/pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ Two uses with a capacity of 2; they run in parallel:
- : unit = ()
```

Capacity of 1; two uses that cannot block and two normal uses; first 2 are parallel, next 2 are sequential:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let p0, r0 = Promise.create () in
let p1, r1 = Promise.create () in
let t = create 1 [p0; p1] in
Fiber.all [
(fun () -> P.use t ~never_block:true (fun x -> traceln "A: using item %d" x; Fiber.yield (); traceln "A done"));
(fun () -> P.use t ~never_block:true (fun x -> traceln "B: using item %d" x; Fiber.yield (); traceln "B done"));
(fun () -> P.use t (fun x -> traceln "C: using item %d" x; Fiber.yield (); traceln "C done"));
(fun () -> P.use t (fun x -> traceln "D: using item %d" x; Fiber.yield (); traceln "D done"));
(fun () -> Promise.resolve r0 (Ok 0); Promise.resolve r1 (Ok 1));
];
+Creating item 0
+Creating item 1
+A: using item 1
+B: using item 0
+A done
+B done
+C: using item 1
+C done
+D: using item 1
+D done
- : unit = ()
```

## Cancellation

```ocaml
Expand Down

0 comments on commit fcda90c

Please sign in to comment.