Skip to content

Commit

Permalink
Add a many-to-many, poisonable stream
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jun 14, 2024
1 parent 6a94293 commit 8cf4cf3
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 0 deletions.
51 changes: 51 additions & 0 deletions bench/bench_stream.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
open Multicore_bench
module Stream = Picos_sync.Stream

let run_one ~budgetf ~n_pusher () =
let n_readers = 1 in
let n_domains = n_pusher + n_readers in

let n_msgs = 200 / n_readers * Util.iter_factor in

let t = Stream.create ~padded:true () in

let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
Atomic.set n_msgs_to_add n_msgs;
Stream.tap t
in
let wrap _ _ = Scheduler.run in
let work i c =
if i < n_pusher then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Stream.push t i
done;
work ()
end
in
work ()
else
let rec loop n c =
if 0 < n then
let _, c = Stream.read c in
loop (n - 1) c
in
loop n_msgs c
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, 1 nb reader" (format "nb pusher" n_pusher)
in
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
[ 1; 2; 4 ]
|> List.concat_map @@ fun n_pusher -> run_one ~budgetf ~n_pusher ()
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let benchmarks =
("Picos_mpscq", Bench_mpscq.run_suite);
("Picos_htbl", Bench_htbl.run_suite);
("Picos_stdio", Bench_stdio.run_suite);
("Picos_sync Stream", Bench_stream.run_suite);
("Fib", Bench_fib.run_suite);
("Picos binaries", Bench_binaries.run_suite);
]
Expand Down
1 change: 1 addition & 0 deletions lib/picos_sync/picos_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ module Mutex = Mutex
module Condition = Condition
module Lazy = Lazy
module Event = Event
module Stream = Stream
45 changes: 45 additions & 0 deletions lib/picos_sync/picos_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,51 @@ module Event : sig
computation. *)
end

module Stream : sig
(** A lock-free, poisonable, many-to-many, stream for {!Picos}.
Readers can {!tap} into a stream to get a {!cursor} for reading all the
values {{!push} pushed} to the stream starting from the {!cursor}
position. Conversely, values {{!push} pushed} to a stream are lost unless
a reader has a {!cursor} to the position in the stream. *)

type !'a t
(** Represents a stream of values of type ['a]. *)

val create : ?padded:bool -> unit -> 'a t
(** [create ()] returns a new stream. *)

val push : 'a t -> 'a -> unit
(** [push stream value] adds the [value] to the current position of the
[stream] and advances the stream to the next position unless the [stream]
has been {{!poison} poisoned} in which case only the exception given to
{!poison} will be raised. *)

val poison : 'a t -> Picos.Exn_bt.t -> unit
(** [poison stream exn_bt] marks the stream as poisoned at the current
position, which means that subsequent attempts to {!push} to the [stream]
will raise the given exception with backtrace. *)

type !'a cursor
(** Represents a (past or current) position in a stream. *)

val tap : 'a t -> 'a cursor
(** [tap stream] returns a {!cursor} to the current position of the
[stream]. *)

val read : 'a cursor -> 'a * 'a cursor
(** When the [cursor] points to a past position in the stream, [read cursor]
immediately returns the value pushed to the position and a cursor to the
next position. If the [cursor] points to the current position of the
stream, [read cursor] waits until a value is pushed to the stream and
returns the value and a cursor to the next position or, if the stream is
poisoned, raises the exception that the stream was poisoned with. *)

val pushed : 'a cursor -> ('a * 'a cursor) Event.t
(** [pushed cursor] returns an {{!Event} event} that {{!read} reads} from the
[cursor] position. *)
end

(** {1 Examples}
{2 A simple bounded queue}
Expand Down
38 changes: 38 additions & 0 deletions lib/picos_sync/stream.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
open Picos

type 'a cursor = Cons of ('a * 'a cursor) Computation.t [@@unboxed]
type 'a t = 'a cursor Atomic.t

let create ?(padded = false) () =
let t = Atomic.make (Cons (Computation.create ())) in
if padded then Multicore_magic.copy_as_padded t else t

let advance t tail curr =
if Atomic.get t == Cons tail then
Atomic.compare_and_set t (Cons tail) curr |> ignore

let help t tail =
let _, curr = Computation.await tail in
advance t tail curr

let rec push t ((_, curr) as next) backoff =
let (Cons tail) = Atomic.get t in
if not (Computation.try_return tail next) then begin
let backoff = Backoff.once backoff in
help t tail;
push t next backoff
end
else advance t tail curr

let push t value = push t (value, Cons (Computation.create ())) Backoff.default

let rec poison t exn_bt =
let (Cons tail) = Atomic.get t in
if not (Computation.try_cancel tail exn_bt) then begin
help t tail;
poison t exn_bt
end

let tap = Atomic.get
let read (Cons at) = Computation.await at
let pushed (Cons at) = Event.from_computation at

0 comments on commit 8cf4cf3

Please sign in to comment.