Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Eio.Pool.use ~never_block #657

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions lib_eio/pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type 'a slot = 'a option ref
module Cell = struct
(* The possible behaviours are:

1. Suspender : In_transition -> Request Suspender waits for a resource
1. Suspender : In_transition -> Request Suspender waits for a resource
1.1. Resumer : Request -> Finished Resumer then providers a resource
1.2. Suspender : Request -> Finished Suspender cancels
2. Resumer : In_transition -> Resource Resumer provides a spare resource
Expand Down Expand Up @@ -89,11 +89,10 @@ let cancel segment cell =
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)

(* If [t] is under capacity, add another (empty) slot. *)
let rec maybe_add_slot t =
let current = Atomic.get t.slots in
let rec maybe_add_slot t current =
if current < t.max_slots then (
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
else maybe_add_slot t (* Concurrent update; try again *)
else maybe_add_slot t (Atomic.get t.slots) (* Concurrent update; try again *)
)

(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
Expand All @@ -114,25 +113,46 @@ let run_with t f slot =
f x
end
with
| r ->
| r ->
add t slot;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
add t slot;
Printexc.raise_with_backtrace ex bt

let use t f =
(* Creates a fresh resource [x], runs [f x], then disposes of [x] *)
let run_new_and_dispose t f =
let x = t.alloc () in
match f x with
| r ->
t.dispose x;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
t.dispose x;
Printexc.raise_with_backtrace ex bt

let use t ?(never_block=false) f =
let segment, cell = Q.next_suspend t.q in
match Atomic.get cell with
| Finished | Request _ -> assert false
| Resource slot ->
Atomic.set cell Finished; (* Allow value to be GC'd *)
run_with t f slot
| In_transition ->
(* It would have been better if more resources were available.
If we still have capacity, add a new slot now. *)
maybe_add_slot t;
let current = Atomic.get t.slots in
match current < t.max_slots with
| false when never_block -> (
(* We are at capacity, but cannot block.
Create a new resource to run f but don't add it to the pool. *)
match Atomic.exchange cell Finished with
| Resource slot -> run_with t f slot
| _ -> run_new_and_dispose t f
)
| can_add ->
(* Create a slot if not at capacity. *)
if can_add then maybe_add_slot t current;
(* No item is available right now. Start waiting *)
let slot =
Suspend.enter_unchecked "Pool.acquire" (fun ctx enqueue ->
Expand Down
10 changes: 8 additions & 2 deletions lib_eio/pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ val create :
If it raises, the exception is passed on to the user,
but resource is still considered to have been disposed. *)

val use : 'a t -> ('a -> 'b) -> 'b
val use : 'a t -> ?never_block:bool -> ('a -> 'b) -> 'b
(** [use t fn] waits for some resource [x] to be available and then runs [f x].
Afterwards (on success or error), [x] is returned to the pool. *)
Afterwards (on success or error), [x] is returned to the pool.

@param never_block If [true] and the pool has reached maximum capacity,
then a fresh resource is created to ensure that this [use]
call does not wait for a resource to become available.
This resource is immediately disposed after [f x] returns.
*)
31 changes: 31 additions & 0 deletions tests/pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,37 @@ Two uses with a capacity of 2; they run in parallel:
- : unit = ()
```

Capacity of 1; two uses that cannot block and two normal uses; first 2 are parallel, next 2 are sequential.
Note that the pool always suspends the calling fiber when creating a new slot,
even if the fiber ends up providing the new slot to itself,
which is why the items get assigned out of order in this test.

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let p0, r0 = Promise.create () in
let p1, r1 = Promise.create () in
let t = create 1 [p0; p1] ~dispose in
Fiber.all [
(fun () -> P.use t ~never_block:true (fun x -> traceln "A: using item %d" x; Fiber.yield (); traceln "A done"));
(fun () -> P.use t ~never_block:true (fun x -> traceln "B: using item %d" x; Fiber.yield (); traceln "B done"));
(fun () -> P.use t (fun x -> traceln "C: using item %d" x; Fiber.yield (); traceln "C done"));
(fun () -> P.use t (fun x -> traceln "D: using item %d" x; Fiber.yield (); traceln "D done"));
(fun () -> Promise.resolve r0 (Ok 0); Promise.resolve r1 (Ok 1));
];
+Creating item 0
+Creating item 1
+A: using item 1
+B: using item 0
+A done
+B done
+disposing 0
+C: using item 1
+C done
+D: using item 1
+D done
- : unit = ()
```

## Cancellation

```ocaml
Expand Down
Loading