From e436b9b8492fefeef413d67f97c2fa32badc5f38 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 12 Aug 2023 18:30:38 +0100 Subject: [PATCH 1/4] remove Async from Transact(Ex)?Async --- CHANGELOG.md | 3 +- DOCUMENTATION.md | 2 +- samples/Store/Domain/Cart.fs | 2 +- samples/Store/Domain/Favorites.fs | 3 +- samples/Store/Domain/SavedForLater.fs | 2 +- src/Equinox/Decider.fs | 34 +++++++++---------- .../StoreIntegration.fs | 6 ++-- 7 files changed, 27 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce7d36606..d2613af7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added -- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325) +- `Equinox`: `Decider.Transact`, `TransactEx` overloads [#325](https://github.com/jet/equinox/pull/325) - `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341) - `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386) - `Equinox.Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337) @@ -40,6 +40,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386) - `Equinox.Decider`: Replace `'event list` with `'event[]` [#411](https://github.com/jet/equinox/pull/411) - `Equinox.Decider`: Replace `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337) +- `Equinox.Decider`: rename `Decider.TransactAsync`, `Decider.TransactExAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314) - `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390) - `Equinox.Core`: Now a free-standing library that a) does not depend on `Equinox` b) is not depended on by the Stores (though `CosmosStore` inlines `AsyncCacheCell`) [#420](https://github.com/jet/equinox/pull/420) - Stores: Change Event Body types, requiring `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index d54dd0469..f110ccb07 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -1318,7 +1318,7 @@ type Service internal (resolve: CartId -> Equinox.Decider = let decider = resolve cartId let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad - decider.TransactAsync(fun state -> async { + decider.Transact(fun state -> async { match prepare with None -> () | Some prep -> do! prep return interpretMany Fold.fold (Seq.map interpret commands) state }, opt) ``` diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index c13fe59ea..bc5b4ee3e 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -162,7 +162,7 @@ type Service internal (resolve: CartId -> Equinox.Decider = x.Run(cartId, optimistic, commands, ?prepare = prepare) |> Async.Ignore diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index dd8b8b4f5..0387616e0 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -82,7 +82,8 @@ type Service internal (resolve: ClientId -> Equinox.Decider (), decideUnfavorite sku c.State), fun () c -> c.Version) + let mapResult () (c: Equinox.ISyncContext<_>) = c.Version + decider.TransactEx((fun c -> (), decideUnfavorite sku c.State), mapResult) let create resolve = Service(Stream.id >> resolve) diff --git a/samples/Store/Domain/SavedForLater.fs b/samples/Store/Domain/SavedForLater.fs index 6c939a040..b86a7ccff 100644 --- a/samples/Store/Domain/SavedForLater.fs +++ b/samples/Store/Domain/SavedForLater.fs @@ -126,7 +126,7 @@ type Service internal (resolve: ClientId -> Equinox.Deciderbool) -> Async): Async = let decider = resolve clientId - decider.TransactAsync(fun (state: Fold.State) -> async { + decider.Transact(fun (state: Fold.State) -> async { let contents = seq { for item in state -> item.skuId } |> set let! cmd = resolveCommand contents.Contains let _, events = decide maxSavedItems cmd state diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 319d3fdae..3b2d3718e 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -67,33 +67,33 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Uses render to generate a 'view from the persisted final state - member _.TransactAsync(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> - inner.TransactAsync((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct) + member _.Transact(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> + inner.Transact((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the present state, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.TransactAsync(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> + member _.Transact(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) } - inner.TransactAsync(decide', ?load = load, ?attempts = attempts, ct = ct) + inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> + member _.TransactEx(decide: ISyncContext<'state> -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> let decide' c ct = task { let! r, es = Async.StartImmediateAsTask(decide c, ct) in return struct (r, es) } - inner.TransactExAsync(decide', ?load = load, ?attempts = attempts, ct = ct) + inner.TransactEx(decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yields a final 'view produced by mapResult from the 'result and/or the final persisted ISyncContext - member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event[]>, mapResult: 'result -> ISyncContext<'state> -> 'view, - ?load, ?attempts): Async<'view> = Async.call <| fun ct -> + member _.TransactEx(decide: ISyncContext<'state> -> Async<'result * 'event[]>, mapResult: 'result -> ISyncContext<'state> -> 'view, + ?load, ?attempts): Async<'view> = Async.call <| fun ct -> let inline decide' c ct = task { let! r, es = Async.StartImmediateAsTask(decide c, ct) in return struct (r, es) } - inner.TransactExAsync(decide', mapResult, ?load = load, ?attempts = attempts, ct = ct) + inner.TransactEx(decide', mapResult, ?load = load, ?attempts = attempts, ct = ct) /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic /// For F#, the async and FSharpFunc signatures in Decider tend to work better, but the API set is equivalent @@ -175,8 +175,8 @@ and [] /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Uses render to generate a 'view from the persisted final state - member _.TransactAsync(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>, - [] ?load, [] ?attempts, [] ?ct): Task<'view> = + member _.Transact(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>, + [] ?load, [] ?attempts, [] ?ct): Task<'view> = let inline decide struct (_token, state) ct = task { let! es = interpret.Invoke(state, ct) in return struct ((), es) } let inline mapRes () struct (_token, state) = render.Invoke state Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) @@ -185,8 +185,8 @@ and [] /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.TransactAsync(decide: Func<'state, CancellationToken, Task>, - [] ?load, [] ?attempts, [] ?ct): Task<'result> = + member _.Transact(decide: Func<'state, CancellationToken, Task>, + [] ?load, [] ?attempts, [] ?ct): Task<'result> = let inline decide struct (_token, state) ct = decide.Invoke(state, ct) let inline mapRes r _ = r Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) @@ -195,8 +195,8 @@ and [] /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yield result - member _.TransactExAsync(decide: Func, CancellationToken, Task>, - [] ?load, [] ?attempts, [] ?ct): Task<'result> = + member _.TransactEx(decide: Func, CancellationToken, Task>, + [] ?load, [] ?attempts, [] ?ct): Task<'result> = let inline decide (Context c) ct = decide.Invoke(c, ct) let inline mapRes r _ = r Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) @@ -205,8 +205,8 @@ and [] /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Yields a final 'view produced by mapResult from the 'result and/or the final persisted ISyncContext - member _.TransactExAsync(decide: Func, CancellationToken, Task>, mapResult: Func<'result, ISyncContext<'state>, 'view>, - [] ?load, [] ?attempts, [] ?ct): Task<'view> = + member _.TransactEx(decide: Func, CancellationToken, Task>, mapResult: Func<'result, ISyncContext<'state>, 'view>, + [] ?load, [] ?attempts, [] ?ct): Task<'view> = let inline decide (Context c) ct = decide.Invoke(c, ct) let inline mapRes r (Context c) = mapResult.Invoke(r, c) Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) diff --git a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs index ea7711124..c73f57940 100644 --- a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs @@ -424,9 +424,9 @@ type GeneralTests(testOutputHelper) = let id = Guid.NewGuid() let decider = SimplestThing.decider log context id - let! before, after = decider.TransactEx( - (fun state -> state.Version, [| SimplestThing.StuffHappened |]), - mapResult = (fun result ctx-> result, ctx.Version)) + let! before, after = + let mapResult result (ctx: Equinox.ISyncContext<_>) = result, ctx.Version + decider.TransactEx((fun state -> state.Version, [| SimplestThing.StuffHappened |]), mapResult) test <@ [before; after] = [0L; 1L] @> } #if STORE_MESSAGEDB From 1ab7f6105e247594b24c721739f81868bef442d3 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 12 Aug 2023 18:36:28 +0100 Subject: [PATCH 2/4] Reorder --- src/Equinox/Decider.fs | 78 ++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 3b2d3718e..2d8c80d2f 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -40,6 +40,25 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = member _.Transact(decide: 'state -> 'result * 'event[], mapResult: 'result -> 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> inner.Transact(decide >> ValueTuple.Create, mapResult, ?load = load, ?attempts = attempts, ct = ct) + (* Async variants *) + + /// 1. Invoke the supplied Async interpret function with the present state + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Uses render to generate a 'view from the persisted final state + member _.Transact(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> + inner.Transact((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct) + + /// 1. Invoke the supplied Async decide function with the present state, holding the 'result + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Yield result + member _.Transact(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> + let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) } + inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) + + (* Ex, Non Async variants *) + /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) @@ -63,20 +82,7 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = member _.QueryEx(render: ISyncContext<'state> -> 'view, ?load): Async<'view> = Async.call <| fun ct -> inner.QueryEx(render, ?load = load, ct = ct) - /// 1. Invoke the supplied Async interpret function with the present state - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Uses render to generate a 'view from the persisted final state - member _.Transact(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> - inner.Transact((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct) - - /// 1. Invoke the supplied Async decide function with the present state, holding the 'result - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Yield result - member _.Transact(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> - let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) } - inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) + (* Ex, Async variants *) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. @@ -141,6 +147,30 @@ and [] let inline mapRes r struct (_, s) = mapResult.Invoke(r, s) Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) + (* Async variants *) + + /// 1. Invoke the supplied Async interpret function with the present state + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Uses render to generate a 'view from the persisted final state + member _.Transact(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>, + [] ?load, [] ?attempts, [] ?ct): Task<'view> = + let inline decide struct (_token, state) ct = task { let! es = interpret.Invoke(state, ct) in return struct ((), es) } + let inline mapRes () struct (_token, state) = render.Invoke state + Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) + + /// 1. Invoke the supplied Async decide function with the present state, holding the 'result + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Yield result + member _.Transact(decide: Func<'state, CancellationToken, Task>, + [] ?load, [] ?attempts, [] ?ct): Task<'result> = + let inline decide struct (_token, state) ct = decide.Invoke(state, ct) + let inline mapRes r _ = r + Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) + + (* Ex variants *) + /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) @@ -171,25 +201,7 @@ and [] let render (Context c) = render.Invoke(c) Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None) - /// 1. Invoke the supplied Async interpret function with the present state - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Uses render to generate a 'view from the persisted final state - member _.Transact(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>, - [] ?load, [] ?attempts, [] ?ct): Task<'view> = - let inline decide struct (_token, state) ct = task { let! es = interpret.Invoke(state, ct) in return struct ((), es) } - let inline mapRes () struct (_token, state) = render.Invoke state - Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) - - /// 1. Invoke the supplied Async decide function with the present state, holding the 'result - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Yield result - member _.Transact(decide: Func<'state, CancellationToken, Task>, - [] ?load, [] ?attempts, [] ?ct): Task<'result> = - let inline decide struct (_token, state) ct = decide.Invoke(state, ct) - let inline mapRes r _ = r - Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) + (* Ex variants, Async *) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. From 4b80268c934959a72ce2d6f383df1e712daa16c4 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 22 Aug 2023 09:41:41 +0100 Subject: [PATCH 3/4] Reorder --- src/Equinox/Decider.fs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 2d8c80d2f..2eecf1a89 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -57,7 +57,11 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) } inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) - (* Ex, Non Async variants *) + /// Project from the folded 'state, but without executing a decision flow as Transact does + member _.Query(render: 'state -> 'view, ?load): Async<'view> = Async.call <| fun ct -> + inner.Query(render, ?load = load, ct = ct) + + (* Ex variants *) /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. @@ -74,15 +78,11 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = ?load, ?attempts): Async<'view> = Async.call <| fun ct -> inner.TransactEx(decide >> ValueTuple.Create, mapResult, ?load = load, ?attempts = attempts, ct = ct) - /// Project from the folded 'state, but without executing a decision flow as Transact does - member _.Query(render: 'state -> 'view, ?load): Async<'view> = Async.call <| fun ct -> - inner.Query(render, ?load = load, ct = ct) - /// Project from the stream's complete context, but without executing a decision flow as TransactEx does member _.QueryEx(render: ISyncContext<'state> -> 'view, ?load): Async<'view> = Async.call <| fun ct -> inner.QueryEx(render, ?load = load, ct = ct) - (* Ex, Async variants *) + (* Ex Async variants *) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. @@ -147,6 +147,11 @@ and [] let inline mapRes r struct (_, s) = mapResult.Invoke(r, s) Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) + /// Project from the folded 'state, but without executing a decision flow as Transact does + member _.Query(render: Func<'state, 'view>, [] ?load, [] ?ct): Task<'view> = + let render struct (_token, state) = render.Invoke(state) + Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None) + (* Async variants *) /// 1. Invoke the supplied Async interpret function with the present state @@ -191,17 +196,12 @@ and [] let inline mapRes r (Context c) = mapResult.Invoke(r, c) Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) - /// Project from the folded 'state, but without executing a decision flow as Transact does - member _.Query(render: Func<'state, 'view>, [] ?load, [] ?ct): Task<'view> = - let render struct (_token, state) = render.Invoke(state) - Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None) - /// Project from the stream's complete context, but without executing a decision flow as TransactEx does member _.QueryEx(render: Func, 'view>, [] ?load, [] ?ct): Task<'view> = let render (Context c) = render.Invoke(c) Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None) - (* Ex variants, Async *) + (* Ex Async variants *) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. From 1d642c5bebeb5068b6d10efbea763e574e9d5392 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 22 Aug 2023 10:01:15 +0100 Subject: [PATCH 4/4] Tidy --- src/Equinox/Decider.fs | 89 +++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 2eecf1a89..c45c20a9f 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -40,29 +40,10 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = member _.Transact(decide: 'state -> 'result * 'event[], mapResult: 'result -> 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> inner.Transact(decide >> ValueTuple.Create, mapResult, ?load = load, ?attempts = attempts, ct = ct) - (* Async variants *) - - /// 1. Invoke the supplied Async interpret function with the present state - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Uses render to generate a 'view from the persisted final state - member _.Transact(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> - inner.Transact((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct) - - /// 1. Invoke the supplied Async decide function with the present state, holding the 'result - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Yield result - member _.Transact(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> - let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) } - inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) - /// Project from the folded 'state, but without executing a decision flow as Transact does member _.Query(render: 'state -> 'view, ?load): Async<'view> = Async.call <| fun ct -> inner.Query(render, ?load = load, ct = ct) - (* Ex variants *) - /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) @@ -82,7 +63,22 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) = member _.QueryEx(render: ISyncContext<'state> -> 'view, ?load): Async<'view> = Async.call <| fun ct -> inner.QueryEx(render, ?load = load, ct = ct) - (* Ex Async variants *) + (* Async variants *) + + /// 1. Invoke the supplied Async interpret function with the present state + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Uses render to generate a 'view from the persisted final state + member _.Transact(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct -> + inner.Transact((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct) + + /// 1. Invoke the supplied Async decide function with the present state, holding the 'result + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Yield result + member _.Transact(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct -> + let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) } + inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. @@ -112,7 +108,7 @@ and [] /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) member _.Transact(interpret: Func<'state, 'event[]>, - [] ?load, [] ?attempts, []?ct): Task = + [] ?load, [] ?attempts, []?ct): Task = let inline decide struct (_t: StreamToken, state) _ct = Task.FromResult struct ((), interpret.Invoke state) let inline mapRes () struct (_t: StreamToken, _s: 'state) = () Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) @@ -122,7 +118,7 @@ and [] /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) /// 3. Uses render to generate a 'view from the persisted final state member _.Transact(interpret: Func<'state, 'event[]>, render: Func<'state, 'view>, - [] ?load, [] ?attempts, []?ct): Task<'view> = + [] ?load, [] ?attempts, []?ct): Task<'view> = let inline decide struct (_token, state) _ct = Task.FromResult struct ((), interpret.Invoke state) let inline mapRes () struct (_token, state) = render.Invoke state Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) @@ -148,34 +144,11 @@ and [] Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) /// Project from the folded 'state, but without executing a decision flow as Transact does - member _.Query(render: Func<'state, 'view>, [] ?load, [] ?ct): Task<'view> = + member _.Query(render: Func<'state, 'view>, + [] ?load, [] ?ct): Task<'view> = let render struct (_token, state) = render.Invoke(state) Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None) - (* Async variants *) - - /// 1. Invoke the supplied Async interpret function with the present state - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Uses render to generate a 'view from the persisted final state - member _.Transact(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>, - [] ?load, [] ?attempts, [] ?ct): Task<'view> = - let inline decide struct (_token, state) ct = task { let! es = interpret.Invoke(state, ct) in return struct ((), es) } - let inline mapRes () struct (_token, state) = render.Invoke state - Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) - - /// 1. Invoke the supplied Async decide function with the present state, holding the 'result - /// 2. (if events yielded) Attempt to sync the yielded events to the stream. - /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) - /// 3. Yield result - member _.Transact(decide: Func<'state, CancellationToken, Task>, - [] ?load, [] ?attempts, [] ?ct): Task<'result> = - let inline decide struct (_token, state) ct = decide.Invoke(state, ct) - let inline mapRes r _ = r - Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) - - (* Ex variants *) - /// 1. Invoke the supplied decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream. /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) @@ -201,7 +174,27 @@ and [] let render (Context c) = render.Invoke(c) Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None) - (* Ex Async variants *) + (* Async variants *) + + /// 1. Invoke the supplied Async interpret function with the present state + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Uses render to generate a 'view from the persisted final state + member _.Transact(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>, + [] ?load, [] ?attempts, [] ?ct): Task<'view> = + let inline decide struct (_token, state) ct = task { let! es = interpret.Invoke(state, ct) in return struct ((), es) } + let inline mapRes () struct (_token, state) = render.Invoke state + Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) + + /// 1. Invoke the supplied Async decide function with the present state, holding the 'result + /// 2. (if events yielded) Attempt to sync the yielded events to the stream. + /// (Restarts up to maxAttempts times with updated state per attempt, throwing MaxResyncsExhaustedException on failure of final attempt.) + /// 3. Yield result + member _.Transact(decide: Func<'state, CancellationToken, Task>, + [] ?load, [] ?attempts, [] ?ct): Task<'result> = + let inline decide struct (_token, state) ct = decide.Invoke(state, ct) + let inline mapRes r _ = r + Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None) /// 1. Invoke the supplied Async decide function with the current complete context, holding the 'result /// 2. (if events yielded) Attempt to sync the yielded events to the stream.