From c9d42deddba8008512bf5e753eaf5a017970d31f Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Sun, 9 Jul 2023 09:21:55 +0200 Subject: [PATCH 1/3] First Stream.select implementation --- lib_eio/stream.ml | 8 ++++++++ lib_eio/stream.mli | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index 974cfa3b7..f64e65b77 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -125,6 +125,14 @@ let take_nonblocking = function | Sync x -> Sync.take_nonblocking x | Locking x -> Locking.take_nonblocking x +let select streams = + let f_of (stream, f) () = begin + let e = take stream in + f e + end in + let fs = List.map f_of streams in + Fiber.any fs + let length = function | Sync _ -> 0 | Locking x -> Locking.length x diff --git a/lib_eio/stream.mli b/lib_eio/stream.mli index 6554cac1a..cdf394bdb 100644 --- a/lib_eio/stream.mli +++ b/lib_eio/stream.mli @@ -40,6 +40,11 @@ val take_nonblocking : 'a t -> 'a option Note that if another domain may add to the stream then a [None] result may already be out-of-date by the time this returns. *) +val select: ('a t * ('a -> 'b)) list -> 'b +(** [select] waits for any stream to have an item available. The item + is mapped by the provided function and returned. Example: + [select [(s1, fun x -> x+1); (s2, fun x -> x+2)] *) + val length : 'a t -> int (** [length t] returns the number of items currently in [t]. *) From 7ff83d1b150cd040ddb1d140db55c6630ad5a66b Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Sun, 9 Jul 2023 10:03:05 +0200 Subject: [PATCH 2/3] Add warning and test for Stream.select --- lib_eio/stream.mli | 6 +++++- tests/stream.md | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/lib_eio/stream.mli b/lib_eio/stream.mli index cdf394bdb..4d8407a34 100644 --- a/lib_eio/stream.mli +++ b/lib_eio/stream.mli @@ -43,7 +43,11 @@ val take_nonblocking : 'a t -> 'a option val select: ('a t * ('a -> 'b)) list -> 'b (** [select] waits for any stream to have an item available. The item is mapped by the provided function and returned. Example: - [select [(s1, fun x -> x+1); (s2, fun x -> x+2)] *) + [select [(s1, fun x -> x+1); (s2, fun x -> x+2)] + + Warning: as with `Fiber.first`, it is possible that two or more streams + yield an item simultaneously, in which case only one item will be + returned, and the other items are discarded.*) val length : 'a t -> int (** [length t] returns the number of items currently in [t]. *) diff --git a/tests/stream.md b/tests/stream.md index c5a035e3b..48cb97f03 100644 --- a/tests/stream.md +++ b/tests/stream.md @@ -320,6 +320,21 @@ Cancelling writing to a stream: - : unit = () ``` +Selecting from multiple channels: + +```ocaml +# run @@ fun () -> Switch.run (fun sw -> + let t1, t2 = (S.create 2, S.create 2) in + let selector = [(t1, fun s -> traceln "Stream 1: %s" s); (t2, fun s -> traceln "Stream 2: %s" s)] in + Fiber.fork ~sw (fun () -> S.add t2 "Hello"); + Fiber.fork ~sw (fun () -> S.select selector); + Fiber.fork ~sw (fun () -> S.add t1 "Goodbye"); + Fiber.fork ~sw (fun () -> S.select selector));; ++Stream 2: Hello ++Stream 1: Goodbye +- : unit = () +``` + Non-blocking take: ```ocaml From 09acb4a6bea6e5b8ab3868ac0a66032b7836ed15 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Sun, 9 Jul 2023 10:04:32 +0200 Subject: [PATCH 3/3] Tighten Stream.select implementation --- lib_eio/stream.ml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index f64e65b77..95443dee7 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -126,10 +126,7 @@ let take_nonblocking = function | Locking x -> Locking.take_nonblocking x let select streams = - let f_of (stream, f) () = begin - let e = take stream in - f e - end in + let f_of (stream, f) () = f (take stream) in let fs = List.map f_of streams in Fiber.any fs