Skip to content

Commit

Permalink
trace-tef: simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Dec 7, 2023
1 parent c16666d commit 3175096
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 103 deletions.
34 changes: 11 additions & 23 deletions src/tef/b_queue.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Mpsc_queue.t;
q: 'a Mpsc_bag.t;
mutable closed: bool;
mutable consumer_waiting: bool;
consumer_waiting: bool Atomic.t;
}

exception Closed
Expand All @@ -12,9 +12,9 @@ let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Mpsc_queue.create ();
q = Mpsc_bag.create ();
closed = false;
consumer_waiting = false;
consumer_waiting = Atomic.make false;
}

let close (self : _ t) =
Expand All @@ -27,35 +27,23 @@ let close (self : _ t) =

let push (self : _ t) x : unit =
if self.closed then raise Closed;
Mpsc_queue.enqueue self.q x;
Mpsc_bag.add self.q x;
if self.closed then raise Closed;
if self.consumer_waiting then (
if Atomic.get self.consumer_waiting then (
(* wakeup consumer *)
Mutex.lock self.mutex;
Condition.broadcast self.cond;
Mutex.unlock self.mutex
)

let rec pop (self : 'a t) : 'a =
match Mpsc_queue.dequeue self.q with
| x -> x
| exception Mpsc_queue.Empty ->
if self.closed then raise Closed;
Mutex.lock self.mutex;
self.consumer_waiting <- true;
Condition.wait self.cond self.mutex;
self.consumer_waiting <- false;
Mutex.unlock self.mutex;
pop self

let rec pop_all (self : 'a t) : 'a list =
match Mpsc_queue.dequeue_all self.q with
| l -> l
| exception Mpsc_queue.Empty ->
match Mpsc_bag.pop_all self.q with
| l -> List.rev l
| exception Mpsc_bag.Empty ->
if self.closed then raise Closed;
Mutex.lock self.mutex;
self.consumer_waiting <- true;
Atomic.set self.consumer_waiting true;
Condition.wait self.cond self.mutex;
self.consumer_waiting <- false;
Atomic.set self.consumer_waiting false;
Mutex.unlock self.mutex;
pop_all self
8 changes: 2 additions & 6 deletions src/tef/b_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)

val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)

val pop_all : 'a t -> 'a list
(** [transfer bq q2] transfers all items presently
in [bq] into [q2], and clears [bq].
(** [pop_all bq] returns all items presently
in [bq], in the same order, and clears [bq].
It blocks if no element is in [bq]. *)

val close : _ t -> unit
Expand Down
32 changes: 32 additions & 0 deletions src/tef/mpsc_bag.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
type 'a t = { bag: 'a list Atomic.t } [@@unboxed]

let create () =
let bag = Atomic.make [] in
{ bag }

module Backoff = struct
type t = int

let default = 2

let once (b : t) : t =
for _i = 1 to b do
Relax_.cpu_relax ()
done;
min (b * 2) 256
end

let rec add backoff t x =
let before = Atomic.get t.bag in
let after = x :: before in
if not (Atomic.compare_and_set t.bag before after) then
add (Backoff.once backoff) t x

let[@inline] add t x = add Backoff.default t x

exception Empty

let[@inline] pop_all t : _ list =
match Atomic.exchange t.bag [] with
| [] -> raise_notrace Empty
| l -> l
14 changes: 14 additions & 0 deletions src/tef/mpsc_bag.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
(** A multi-producer, single-consumer bag *)

type 'a t

val create : unit -> 'a t

val add : 'a t -> 'a -> unit
(** [add q x] adds [x] in the bag. *)

exception Empty

val pop_all : 'a t -> 'a list
(** Return all current items in an unspecified order.
@raise Empty if empty *)
60 changes: 0 additions & 60 deletions src/tef/mpsc_queue.ml

This file was deleted.

14 changes: 0 additions & 14 deletions src/tef/mpsc_queue.mli

This file was deleted.

0 comments on commit 3175096

Please sign in to comment.