Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: Remove Async from TransactAsync, TransactExAsync #422

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox`: `Decider.Transact`, `TransactEx` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341)
- `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
Expand Down Expand Up @@ -40,6 +40,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Decider`: Replace `'event list` with `'event[]` [#411](https://github.com/jet/equinox/pull/411)
- `Equinox.Decider`: Replace `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: rename `Decider.TransactAsync`, `Decider.TransactExAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314)
- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390)
- `Equinox.Core`: Now a free-standing library that a) does not depend on `Equinox` b) is not depended on by the Stores (though `CosmosStore` inlines `AsyncCacheCell`) [#420](https://github.com/jet/equinox/pull/420)
- Stores: Change Event Body types, requiring `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
Expand Down
2 changes: 1 addition & 1 deletion DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
member _.Run(cartId, optimistic, commands: Command seq, ?prepare): Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.TransactAsync(fun state -> async {
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state }, opt)
```
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
#endif
let decider = resolve cartId
let opt = if optimistic then Equinox.LoadOption.AnyCachedValue else Equinox.LoadOption.RequireLoad
decider.TransactAsync(interpret, opt)
decider.Transact(interpret, opt)

member x.ExecuteManyAsync(cartId, optimistic, commands: Command seq, ?prepare): Async<unit> =
x.Run(cartId, optimistic, commands, ?prepare = prepare) |> Async.Ignore
Expand Down
3 changes: 2 additions & 1 deletion samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
// NOTE not a real world example - used for an integration test; TODO get a better example where it's actually relevant
member _.UnfavoriteWithPostVersion(clientId, sku) =
let decider = resolve clientId
decider.TransactEx((fun c -> (), decideUnfavorite sku c.State), fun () c -> c.Version)
let mapResult () (c: Equinox.ISyncContext<_>) = c.Version
decider.TransactEx((fun c -> (), decideUnfavorite sku c.State), mapResult)

let create resolve =
Service(Stream.id >> resolve)
2 changes: 1 addition & 1 deletion samples/Store/Domain/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S

let remove clientId (resolveCommand: (SkuId->bool) -> Async<Command>): Async<unit> =
let decider = resolve clientId
decider.TransactAsync(fun (state: Fold.State) -> async {
decider.Transact(fun (state: Fold.State) -> async {
let contents = seq { for item in state -> item.skuId } |> set
let! cmd = resolveCommand contents.Contains
let _, events = decide maxSavedItems cmd state
Expand Down
61 changes: 33 additions & 28 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) =
member _.Transact(decide: 'state -> 'result * 'event[], mapResult: 'result -> 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct ->
inner.Transact(decide >> ValueTuple.Create, mapResult, ?load = load, ?attempts = attempts, ct = ct)

/// Project from the folded <c>'state</c>, but without executing a decision flow as <c>Transact</c> does
member _.Query(render: 'state -> 'view, ?load): Async<'view> = Async.call <| fun ct ->
inner.Query(render, ?load = load, ct = ct)

/// 1. Invoke the supplied <c>decide</c> function with the current complete context, holding the <c>'result</c>
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
Expand All @@ -55,45 +59,43 @@ type Decider<'event, 'state>(inner: DeciderCore<'event, 'state>) =
?load, ?attempts): Async<'view> = Async.call <| fun ct ->
inner.TransactEx(decide >> ValueTuple.Create, mapResult, ?load = load, ?attempts = attempts, ct = ct)

/// Project from the folded <c>'state</c>, but without executing a decision flow as <c>Transact</c> does
member _.Query(render: 'state -> 'view, ?load): Async<'view> = Async.call <| fun ct ->
inner.Query(render, ?load = load, ct = ct)

/// Project from the stream's complete context, but without executing a decision flow as <c>TransactEx<c> does
member _.QueryEx(render: ISyncContext<'state> -> 'view, ?load): Async<'view> = Async.call <| fun ct ->
inner.QueryEx(render, ?load = load, ct = ct)

(* Async variants *)

/// 1. Invoke the supplied <c>Async</c> <c>interpret</c> function with the present state
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Uses <c>render</c> to generate a 'view from the persisted final state
member _.TransactAsync(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct ->
inner.TransactAsync((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct)
member _.Transact(interpret: 'state -> Async<'event[]>, render: 'state -> 'view, ?load, ?attempts): Async<'view> = Async.call <| fun ct ->
inner.Transact((fun s ct -> Async.StartImmediateAsTask(interpret s, ct)), render, ?load = load, ?attempts = attempts, ct = ct)

/// 1. Invoke the supplied <c>Async</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Yield result
member _.TransactAsync(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct ->
member _.Transact(decide: 'state -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct ->
let inline decide' s ct = task { let! r, es = Async.StartImmediateAsTask(decide s, ct) in return struct (r, es) }
inner.TransactAsync(decide', ?load = load, ?attempts = attempts, ct = ct)
inner.Transact(decide', ?load = load, ?attempts = attempts, ct = ct)

/// 1. Invoke the supplied <c>Async</c> <c>decide</c> function with the current complete context, holding the <c>'result</c>
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Yield result
member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct ->
member _.TransactEx(decide: ISyncContext<'state> -> Async<'result * 'event[]>, ?load, ?attempts): Async<'result> = Async.call <| fun ct ->
let decide' c ct = task { let! r, es = Async.StartImmediateAsTask(decide c, ct) in return struct (r, es) }
inner.TransactExAsync(decide', ?load = load, ?attempts = attempts, ct = ct)
inner.TransactEx(decide', ?load = load, ?attempts = attempts, ct = ct)

/// 1. Invoke the supplied <c>Async</c> <c>decide</c> function with the current complete context, holding the <c>'result</c>
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Yields a final 'view produced by <c>mapResult</c> from the <c>'result</c> and/or the final persisted <c>ISyncContext</c>
member _.TransactExAsync(decide: ISyncContext<'state> -> Async<'result * 'event[]>, mapResult: 'result -> ISyncContext<'state> -> 'view,
?load, ?attempts): Async<'view> = Async.call <| fun ct ->
member _.TransactEx(decide: ISyncContext<'state> -> Async<'result * 'event[]>, mapResult: 'result -> ISyncContext<'state> -> 'view,
?load, ?attempts): Async<'view> = Async.call <| fun ct ->
let inline decide' c ct = task { let! r, es = Async.StartImmediateAsTask(decide c, ct) in return struct (r, es) }
inner.TransactExAsync(decide', mapResult, ?load = load, ?attempts = attempts, ct = ct)
inner.TransactEx(decide', mapResult, ?load = load, ?attempts = attempts, ct = ct)

/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
/// For F#, the async and FSharpFunc signatures in Decider tend to work better, but the API set is equivalent
Expand All @@ -106,7 +108,7 @@ and [<NoComparison; NoEquality>]
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
member _.Transact(interpret: Func<'state, 'event[]>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D(CancellationToken())>]?ct): Task<unit> =
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>]?ct): Task<unit> =
let inline decide struct (_t: StreamToken, state) _ct = Task.FromResult struct ((), interpret.Invoke state)
let inline mapRes () struct (_t: StreamToken, _s: 'state) = ()
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)
Expand All @@ -116,7 +118,7 @@ and [<NoComparison; NoEquality>]
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Uses <c>render</c> to generate a 'view from the persisted final state
member _.Transact(interpret: Func<'state, 'event[]>, render: Func<'state, 'view>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D(CancellationToken())>]?ct): Task<'view> =
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>]?ct): Task<'view> =
let inline decide struct (_token, state) _ct = Task.FromResult struct ((), interpret.Invoke state)
let inline mapRes () struct (_token, state) = render.Invoke state
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)
Expand All @@ -141,6 +143,12 @@ and [<NoComparison; NoEquality>]
let inline mapRes r struct (_, s) = mapResult.Invoke(r, s)
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)

/// Project from the folded <c>'state</c>, but without executing a decision flow as <c>Transact</c> does
member _.Query(render: Func<'state, 'view>,
[<O; D null>] ?load, [<O; D null>] ?ct): Task<'view> =
let render struct (_token, state) = render.Invoke(state)
Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None)

/// 1. Invoke the supplied <c>decide</c> function with the current complete context, holding the <c>'result</c>
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
Expand All @@ -161,22 +169,19 @@ and [<NoComparison; NoEquality>]
let inline mapRes r (Context c) = mapResult.Invoke(r, c)
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)

/// Project from the folded <c>'state</c>, but without executing a decision flow as <c>Transact</c> does
member _.Query(render: Func<'state, 'view>, [<O; D null>] ?load, [<O; D null>] ?ct): Task<'view> =
let render struct (_token, state) = render.Invoke(state)
Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None)

/// Project from the stream's complete context, but without executing a decision flow as <c>TransactEx<c> does
member _.QueryEx(render: Func<ISyncContext<'state>, 'view>, [<O; D null>] ?load, [<O; D null>] ?ct): Task<'view> =
let render (Context c) = render.Invoke(c)
Stream.query (stream, LoadPolicy.Fetch load, render, defaultArg ct CancellationToken.None)

(* Async variants *)

/// 1. Invoke the supplied <c>Async</c> <c>interpret</c> function with the present state
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Uses <c>render</c> to generate a 'view from the persisted final state
member _.TransactAsync(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'view> =
member _.Transact(interpret: Func<'state, CancellationToken, Task<'event[]>>, render: Func<'state, 'view>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'view> =
let inline decide struct (_token, state) ct = task { let! es = interpret.Invoke(state, ct) in return struct ((), es) }
let inline mapRes () struct (_token, state) = render.Invoke state
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)
Expand All @@ -185,8 +190,8 @@ and [<NoComparison; NoEquality>]
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Yield result
member _.TransactAsync(decide: Func<'state, CancellationToken, Task<struct ('result * 'event[])>>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'result> =
member _.Transact(decide: Func<'state, CancellationToken, Task<struct ('result * 'event[])>>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'result> =
let inline decide struct (_token, state) ct = decide.Invoke(state, ct)
let inline mapRes r _ = r
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)
Expand All @@ -195,8 +200,8 @@ and [<NoComparison; NoEquality>]
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Yield result
member _.TransactExAsync(decide: Func<ISyncContext<'state>, CancellationToken, Task<struct ('result * 'event[])>>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'result> =
member _.TransactEx(decide: Func<ISyncContext<'state>, CancellationToken, Task<struct ('result * 'event[])>>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'result> =
let inline decide (Context c) ct = decide.Invoke(c, ct)
let inline mapRes r _ = r
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)
Expand All @@ -205,8 +210,8 @@ and [<NoComparison; NoEquality>]
/// 2. (if events yielded) Attempt to sync the yielded events to the stream.
/// (Restarts up to <c>maxAttempts</c> times with updated state per attempt, throwing <c>MaxResyncsExhaustedException</c> on failure of final attempt.)
/// 3. Yields a final 'view produced by <c>mapResult</c> from the <c>'result</c> and/or the final persisted <c>ISyncContext</c>
member _.TransactExAsync(decide: Func<ISyncContext<'state>, CancellationToken, Task<struct ('result * 'event[])>>, mapResult: Func<'result, ISyncContext<'state>, 'view>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'view> =
member _.TransactEx(decide: Func<ISyncContext<'state>, CancellationToken, Task<struct ('result * 'event[])>>, mapResult: Func<'result, ISyncContext<'state>, 'view>,
[<O; D null>] ?load, [<O; D null>] ?attempts, [<O; D null>] ?ct): Task<'view> =
let inline decide (Context c) ct = decide.Invoke(c, ct)
let inline mapRes r (Context c) = mapResult.Invoke(r, c)
Stream.transact (stream, LoadPolicy.Fetch load, decide, AttemptsPolicy.Validate attempts, mapRes, defaultArg ct CancellationToken.None)
Expand Down
6 changes: 3 additions & 3 deletions tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,9 @@ type GeneralTests(testOutputHelper) =
let id = Guid.NewGuid()
let decider = SimplestThing.decider log context id

let! before, after = decider.TransactEx(
(fun state -> state.Version, [| SimplestThing.StuffHappened |]),
mapResult = (fun result ctx-> result, ctx.Version))
let! before, after =
let mapResult result (ctx: Equinox.ISyncContext<_>) = result, ctx.Version
decider.TransactEx((fun state -> state.Version, [| SimplestThing.StuffHappened |]), mapResult)
test <@ [before; after] = [0L; 1L] @> }

#if STORE_MESSAGEDB
Expand Down