diff --git a/bench/bench_stream.ml b/bench/bench_stream.ml new file mode 100644 index 000000000..e2aa7acdd --- /dev/null +++ b/bench/bench_stream.ml @@ -0,0 +1,51 @@ +open Multicore_bench +module Stream = Picos_sync.Stream + +let run_one ~budgetf ~n_pusher () = + let n_readers = 1 in + let n_domains = n_pusher + n_readers in + + let n_msgs = 200 / n_readers * Util.iter_factor in + + let t = Stream.create ~padded:true () in + + let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + Atomic.set n_msgs_to_add n_msgs; + Stream.tap t + in + let wrap _ _ = Scheduler.run in + let work i c = + if i < n_pusher then + let rec work () = + let n = Util.alloc n_msgs_to_add in + if 0 < n then begin + for i = 1 to n do + Stream.push t i + done; + work () + end + in + work () + else + let rec loop n c = + if 0 < n then + let _, c = Stream.read c in + loop (n - 1) c + in + loop n_msgs c + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, 1 nb reader" (format "nb pusher" n_pusher) + in + Times.record ~budgetf ~n_domains ~init ~wrap ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + [ 1; 2; 4 ] + |> List.concat_map @@ fun n_pusher -> run_one ~budgetf ~n_pusher () diff --git a/bench/main.ml b/bench/main.ml index c8796e5da..4e75bca11 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -15,6 +15,7 @@ let benchmarks = ("Picos_mpscq", Bench_mpscq.run_suite); ("Picos_htbl", Bench_htbl.run_suite); ("Picos_stdio", Bench_stdio.run_suite); + ("Picos_sync Stream", Bench_stream.run_suite); ("Fib", Bench_fib.run_suite); ("Picos binaries", Bench_binaries.run_suite); ] diff --git a/lib/picos_sync/picos_sync.ml b/lib/picos_sync/picos_sync.ml index 084882fba..b398bbc00 100644 --- a/lib/picos_sync/picos_sync.ml +++ b/lib/picos_sync/picos_sync.ml @@ -2,3 +2,4 @@ module Mutex = Mutex module Condition = Condition module Lazy = Lazy module Event = Event +module Stream = Stream diff --git a/lib/picos_sync/picos_sync.mli b/lib/picos_sync/picos_sync.mli index 8017edc19..706ccee61 100644 --- a/lib/picos_sync/picos_sync.mli +++ b/lib/picos_sync/picos_sync.mli @@ -263,6 +263,51 @@ module Event : sig computation. *) end +module Stream : sig + (** A lock-free, poisonable, many-to-many, stream for {!Picos}. + + Readers can {!tap} into a stream to get a {!cursor} for reading all the + values {{!push} pushed} to the stream starting from the {!cursor} + position. Conversely, values {{!push} pushed} to a stream are lost unless + a reader has a {!cursor} to the position in the stream. *) + + type !'a t + (** Represents a stream of values of type ['a]. *) + + val create : ?padded:bool -> unit -> 'a t + (** [create ()] returns a new stream. *) + + val push : 'a t -> 'a -> unit + (** [push stream value] adds the [value] to the current position of the + [stream] and advances the stream to the next position unless the [stream] + has been {{!poison} poisoned} in which case only the exception given to + {!poison} will be raised. *) + + val poison : 'a t -> Picos.Exn_bt.t -> unit + (** [poison stream exn_bt] marks the stream as poisoned at the current + position, which means that subsequent attempts to {!push} to the [stream] + will raise the given exception with backtrace. *) + + type !'a cursor + (** Represents a (past or current) position in a stream. *) + + val tap : 'a t -> 'a cursor + (** [tap stream] returns a {!cursor} to the current position of the + [stream]. *) + + val read : 'a cursor -> 'a * 'a cursor + (** When the [cursor] points to a past position in the stream, [read cursor] + immediately returns the value pushed to the position and a cursor to the + next position. If the [cursor] points to the current position of the + stream, [read cursor] waits until a value is pushed to the stream and + returns the value and a cursor to the next position or, if the stream is + poisoned, raises the exception that the stream was poisoned with. *) + + val pushed : 'a cursor -> ('a * 'a cursor) Event.t + (** [pushed cursor] returns an {{!Event} event} that {{!read} reads} from the + [cursor] position. *) +end + (** {1 Examples} {2 A simple bounded queue} diff --git a/lib/picos_sync/stream.ml b/lib/picos_sync/stream.ml new file mode 100644 index 000000000..0abbbe08e --- /dev/null +++ b/lib/picos_sync/stream.ml @@ -0,0 +1,38 @@ +open Picos + +type 'a cursor = Cons of ('a * 'a cursor) Computation.t [@@unboxed] +type 'a t = 'a cursor Atomic.t + +let create ?(padded = false) () = + let t = Atomic.make (Cons (Computation.create ())) in + if padded then Multicore_magic.copy_as_padded t else t + +let advance t tail curr = + if Atomic.get t == Cons tail then + Atomic.compare_and_set t (Cons tail) curr |> ignore + +let help t tail = + let _, curr = Computation.await tail in + advance t tail curr + +let rec push t ((_, curr) as next) backoff = + let (Cons tail) = Atomic.get t in + if not (Computation.try_return tail next) then begin + let backoff = Backoff.once backoff in + help t tail; + push t next backoff + end + else advance t tail curr + +let push t value = push t (value, Cons (Computation.create ())) Backoff.default + +let rec poison t exn_bt = + let (Cons tail) = Atomic.get t in + if not (Computation.try_cancel tail exn_bt) then begin + help t tail; + poison t exn_bt + end + +let tap = Atomic.get +let read (Cons at) = Computation.await at +let pushed (Cons at) = Event.from_computation at