-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a many-to-many, poisonable stream
- Loading branch information
Showing
5 changed files
with
137 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
open Multicore_bench | ||
module Stream = Picos_sync.Stream | ||
|
||
let run_one ~budgetf ~n_pusher () = | ||
let n_readers = 1 in | ||
let n_domains = n_pusher + n_readers in | ||
|
||
let n_msgs = 200 / n_readers * Util.iter_factor in | ||
|
||
let t = Stream.create ~padded:true () in | ||
|
||
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in | ||
|
||
let init _ = | ||
Atomic.set n_msgs_to_add n_msgs; | ||
Stream.tap t | ||
in | ||
let wrap _ _ = Scheduler.run in | ||
let work i c = | ||
if i < n_pusher then | ||
let rec work () = | ||
let n = Util.alloc n_msgs_to_add in | ||
if 0 < n then begin | ||
for i = 1 to n do | ||
Stream.push t i | ||
done; | ||
work () | ||
end | ||
in | ||
work () | ||
else | ||
let rec loop n c = | ||
if 0 < n then | ||
let _, c = Stream.read c in | ||
loop (n - 1) c | ||
in | ||
loop n_msgs c | ||
in | ||
|
||
let config = | ||
let format role n = | ||
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") | ||
in | ||
Printf.sprintf "%s, 1 nb reader" (format "nb pusher" n_pusher) | ||
in | ||
Times.record ~budgetf ~n_domains ~init ~wrap ~work () | ||
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config | ||
|
||
let run_suite ~budgetf = | ||
[ 1; 2; 4 ] | ||
|> List.concat_map @@ fun n_pusher -> run_one ~budgetf ~n_pusher () |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,3 +4,4 @@ module Lazy = Lazy | |
module Event = Event | ||
module Latch = Latch | ||
module Ivar = Ivar | ||
module Stream = Stream |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
open Picos | ||
|
||
type 'a cursor = Cons of ('a * 'a cursor) Computation.t [@@unboxed] | ||
type 'a t = 'a cursor Atomic.t | ||
|
||
let create ?padded () = | ||
Multicore_magic.copy_as ?padded | ||
(Atomic.make (Cons (Computation.create ~mode:`LIFO ()))) | ||
|
||
let advance t tail curr = | ||
if Atomic.get t == Cons tail then | ||
Atomic.compare_and_set t (Cons tail) curr |> ignore | ||
|
||
let help t tail = | ||
let _, curr = Computation.await tail in | ||
advance t tail curr | ||
|
||
let rec push t ((_, curr) as next) backoff = | ||
let (Cons tail) = Atomic.get t in | ||
if not (Computation.try_return tail next) then begin | ||
let backoff = Backoff.once backoff in | ||
help t tail; | ||
push t next backoff | ||
end | ||
else advance t tail curr | ||
|
||
let push t value = | ||
push t (value, Cons (Computation.create ~mode:`LIFO ())) Backoff.default | ||
|
||
let rec poison t exn_bt = | ||
let (Cons tail) = Atomic.get t in | ||
if not (Computation.try_cancel tail exn_bt) then begin | ||
help t tail; | ||
poison t exn_bt | ||
end | ||
|
||
let tap = Atomic.get | ||
let read (Cons at) = Computation.await at | ||
let read_evt (Cons at) = Event.from_computation at |