diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index 974cfa3b7..95443dee7 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -125,6 +125,11 @@ let take_nonblocking = function | Sync x -> Sync.take_nonblocking x | Locking x -> Locking.take_nonblocking x +let select streams = + let f_of (stream, f) () = f (take stream) in + let fs = List.map f_of streams in + Fiber.any fs + let length = function | Sync _ -> 0 | Locking x -> Locking.length x diff --git a/lib_eio/stream.mli b/lib_eio/stream.mli index 6554cac1a..4d8407a34 100644 --- a/lib_eio/stream.mli +++ b/lib_eio/stream.mli @@ -40,6 +40,15 @@ val take_nonblocking : 'a t -> 'a option Note that if another domain may add to the stream then a [None] result may already be out-of-date by the time this returns. *) +val select: ('a t * ('a -> 'b)) list -> 'b +(** [select] waits for any stream to have an item available. The item + is mapped by the provided function and returned. Example: + [select [(s1, fun x -> x+1); (s2, fun x -> x+2)] + + Warning: as with `Fiber.first`, it is possible that two or more streams + yield an item simultaneously, in which case only one item will be + returned, and the other items are discarded.*) + val length : 'a t -> int (** [length t] returns the number of items currently in [t]. *) diff --git a/tests/stream.md b/tests/stream.md index c5a035e3b..48cb97f03 100644 --- a/tests/stream.md +++ b/tests/stream.md @@ -320,6 +320,21 @@ Cancelling writing to a stream: - : unit = () ``` +Selecting from multiple channels: + +```ocaml +# run @@ fun () -> Switch.run (fun sw -> + let t1, t2 = (S.create 2, S.create 2) in + let selector = [(t1, fun s -> traceln "Stream 1: %s" s); (t2, fun s -> traceln "Stream 2: %s" s)] in + Fiber.fork ~sw (fun () -> S.add t2 "Hello"); + Fiber.fork ~sw (fun () -> S.select selector); + Fiber.fork ~sw (fun () -> S.add t1 "Goodbye"); + Fiber.fork ~sw (fun () -> S.select selector));; ++Stream 2: Hello ++Stream 1: Goodbye +- : unit = () +``` + Non-blocking take: ```ocaml