diff --git a/lib/picos_sync/stream.ml b/lib/picos_sync/stream.ml index cafe22c7d..1d8700f44 100644 --- a/lib/picos_sync/stream.ml +++ b/lib/picos_sync/stream.ml @@ -8,31 +8,38 @@ let create ?padded () = (Atomic.make (Cons (Computation.create ~mode:`LIFO ()))) let advance t tail curr = - if Atomic.get t == Cons tail then - Atomic.compare_and_set t (Cons tail) curr |> ignore + let at = Atomic.get t in + if at == Cons tail then begin + Atomic.compare_and_set t (Cons tail) curr |> ignore; + curr + end + else at let help t tail = let _, curr = Computation.await tail in advance t tail curr -let rec push t ((_, curr) as next) backoff = - let (Cons tail) = Atomic.get t in - if not (Computation.try_return tail next) then begin - let backoff = Backoff.once backoff in - help t tail; - push t next backoff +let rec try_push t ((_, curr) as next) (Cons tail) backoff = + if Computation.try_return tail next then begin + advance t tail curr |> ignore; + true end - else advance t tail curr + else + let backoff = Backoff.once backoff in + try_push t next (help t tail) backoff let push t value = - push t (value, Cons (Computation.create ~mode:`LIFO ())) Backoff.default + let next = (value, Cons (Computation.create ~mode:`LIFO ())) in + try_push t next (Atomic.get t) Backoff.default |> ignore -let rec poison t exn_bt = - let (Cons tail) = Atomic.get t in - if not (Computation.try_cancel tail exn_bt) then begin - help t tail; - poison t exn_bt - end +let rec try_poison t exn_bt (Cons tail) backoff = + Computation.try_cancel tail exn_bt + || + let backoff = Backoff.once backoff in + try_poison t exn_bt (help t tail) backoff + +let poison t exn_bt = + try_poison t exn_bt (Atomic.get t) Backoff.default |> ignore let tap = Atomic.get let read (Cons at) = Computation.await at