Skip to content

Commit

Permalink
Specialized semaphore implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jul 4, 2024
1 parent 196647f commit 269551b
Showing 1 changed file with 154 additions and 42 deletions.
196 changes: 154 additions & 42 deletions lib/picos_sync/semaphore.ml
Original file line number Diff line number Diff line change
@@ -1,69 +1,181 @@
open Picos

let[@inline never] impossible () = failwith "impossible"
let[@inline never] overflow () = raise (Sys_error "overflow")
let[@inline never] negative () = invalid_arg "negative initial count"

(* TODO: This is not the fastest nor the most scalable implementation. *)
(* TODO: This is probably not the fastest nor the most scalable implementation. *)

type semaphore = { mutex : Mutex.t; nonzero : Condition.t; count : int ref }
type queue = { head : Trigger.t list; tail : Trigger.t list }

module Counting = struct
type t = semaphore
type t = Obj.t Atomic.t

let make ?padded count =
if count < 0 then negative ();
let mutex = Mutex.create ?padded () in
let count = ref count |> Multicore_magic.copy_as ?padded in
let nonzero = Condition.create ?padded () in
Multicore_magic.copy_as ?padded { mutex; nonzero; count }
Atomic.make (Obj.repr count) |> Multicore_magic.copy_as ?padded

let release t =
Mutex.lock ~checked:false t.mutex;
let count = !(t.count) in
if count < count + 1 then t.count := count + 1;
Mutex.unlock ~checked:false t.mutex;
if count < count + 1 then Condition.signal t.nonzero else overflow ()
let rec release t backoff =
let before = Atomic.get t in
if Obj.is_int before then begin
let count = Obj.obj before in
if count < count + 1 then begin
let after = Obj.repr (count + 1) in
if not (Atomic.compare_and_set t before after) then
release t (Backoff.once backoff)
end
else overflow ()
end
else
let queue = Obj.obj before in
match queue.head with
| trigger :: triggers ->
let after =
if triggers == [] && queue.tail == [] then Obj.repr 0
else Obj.repr { queue with head = triggers }
in
if Atomic.compare_and_set t before after then Trigger.signal trigger
else release t (Backoff.once backoff)
| [] -> begin
match List.rev queue.tail with
| [] -> impossible ()
| trigger :: triggers ->
let after =
if triggers == [] then Obj.repr 0
else Obj.repr { head = triggers; tail = [] }
in
if Atomic.compare_and_set t before after then
Trigger.signal trigger
else release t (Backoff.once backoff)
end

let acquire t =
Mutex.lock ~checked:false t.mutex;
match
while !(t.count) = 0 do
Condition.wait t.nonzero t.mutex
done
with
| () ->
t.count := !(t.count) - 1;
Mutex.unlock ~checked:false t.mutex
| exception exn ->
let exn_bt = Exn_bt.get exn in
Mutex.unlock ~checked:false t.mutex;
Exn_bt.raise exn_bt
let rec cleanup t trigger backoff =
let before = Atomic.get t in
if not (Obj.is_int before) then
let r = Obj.obj before in
if r.head != [] then
match List_ext.drop_first_or_not_found trigger r.head with
| head ->
let after =
if head == [] && r.tail == [] then Obj.repr 0
else Obj.repr { r with head }
in
if not (Atomic.compare_and_set t before after) then
cleanup t trigger (Backoff.once backoff)
| exception Not_found -> begin
match List_ext.drop_first_or_not_found trigger r.tail with
| tail ->
let after = Obj.repr { r with tail } in
if not (Atomic.compare_and_set t before after) then
cleanup t trigger (Backoff.once backoff)
| exception Not_found -> release t Backoff.default
end
else
match List_ext.drop_first_or_not_found trigger r.tail with
| tail ->
let after =
if tail == [] then Obj.repr 0 else Obj.repr { head = []; tail }
in
if not (Atomic.compare_and_set t before after) then
cleanup t trigger (Backoff.once backoff)
| exception Not_found -> release t Backoff.default

let try_acquire t =
Mutex.lock ~checked:false t.mutex;
let count = !(t.count) in
if 0 < count then t.count := count - 1;
Mutex.unlock ~checked:false t.mutex;
let rec acquire t backoff =
let before = Atomic.get t in
if Obj.is_int before then
let count = Obj.obj before in
if 0 < count then begin
let after = Obj.repr (count - 1) in
if not (Atomic.compare_and_set t before after) then
acquire t (Backoff.once backoff)
end
else
let trigger = Trigger.create () in
let after = Obj.repr { head = [ trigger ]; tail = [] } in
if Atomic.compare_and_set t before after then begin
match Trigger.await trigger with
| None -> ()
| Some exn_bt ->
cleanup t trigger Backoff.default;
Exn_bt.raise exn_bt
end
else acquire t (Backoff.once backoff)
else
let queue = Obj.obj before in
let trigger = Trigger.create () in
let after =
if queue.head == [] then
Obj.repr { head = List.rev_append queue.tail [ trigger ]; tail = [] }
else Obj.repr { head = queue.head; tail = trigger :: queue.tail }
in
if Atomic.compare_and_set t before after then begin
match Trigger.await trigger with
| None -> ()
| Some exn_bt ->
cleanup t trigger Backoff.default;
Exn_bt.raise exn_bt
end
else acquire t (Backoff.once backoff)

let rec try_acquire t backoff =
let before = Atomic.get t in
Obj.is_int before
&&
let count = Obj.obj before in
0 < count
&&
let after = Obj.repr (count - 1) in
Atomic.compare_and_set t before after
|| try_acquire t (Backoff.once backoff)

let get_value t =
if Mutex.try_lock ~checked:false t.mutex then
Mutex.unlock ~checked:false t.mutex;
!(t.count)
let state = Atomic.get t in
if Obj.is_int state then Obj.obj state else 0

let[@inline] release t = release t Backoff.default
let[@inline] acquire t = acquire t Backoff.default
let[@inline] try_acquire t = try_acquire t Backoff.default
end

module Binary = struct
type t = semaphore
type t = Counting.t

let make ?padded initial = Counting.make ?padded (Bool.to_int initial)

let release t =
Mutex.lock ~checked:false t.mutex;
let count = !(t.count) in
if count = 0 then t.count := 1;
Mutex.unlock ~checked:false t.mutex;
Condition.signal t.nonzero
let rec release t backoff =
let before = Atomic.get t in
if Obj.is_int before then begin
let count = Obj.obj before in
if count = 0 then
let after = Obj.repr 1 in
if not (Atomic.compare_and_set t before after) then
release t (Backoff.once backoff)
end
else begin
let queue = Obj.obj before in
match queue.head with
| trigger :: triggers ->
let after =
if triggers == [] && queue.tail == [] then Obj.repr 0
else Obj.repr { queue with head = triggers }
in
if Atomic.compare_and_set t before after then Trigger.signal trigger
else release t (Backoff.once backoff)
| [] -> begin
match List.rev queue.tail with
| trigger :: triggers ->
let after =
if triggers == [] then Obj.repr 0
else Obj.repr { head = triggers; tail = [] }
in
if Atomic.compare_and_set t before after then
Trigger.signal trigger
else release t (Backoff.once backoff)
| [] -> impossible ()
end
end

let acquire = Counting.acquire
let try_acquire = Counting.try_acquire
let[@inline] release t = release t Backoff.default
end

0 comments on commit 269551b

Please sign in to comment.