diff --git a/CHANGELOG.md b/CHANGELOG.md index ecaabad3f..3c16bb5f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ The `Unreleased` section name is replaced by the expected version of next releas - `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.0.25` [#310](https://github.com/jet/equinox/pull/310) - `SqlStreamStore`.*: Target `SqlStreamStore` v `1.2.0` (`Postgres` remains at `1.2.0-beta.8` as that's the last released version) [#227](https://github.com/jet/equinox/pull/227) :pray: [@rajivhost](https://github.com/rajivhost) - Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310) +- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) +- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) +- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305) ### Removed diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 8163fe7ea..a6623635a 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -230,10 +230,10 @@ module Log = let internal event (value : Metric) (log : ILogger) = let enrich (e : Serilog.Events.LogEvent) = e.AddPropertyIfAbsent(Serilog.Events.LogEventProperty(PropertyTag, Serilog.Events.ScalarValue(value))) - log.ForContext({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt, _) = enrich evt }) + log.ForContext({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt,_) = enrich evt }) let internal (|BlobLen|) (x : EventBody) = if x.ValueKind = JsonValueKind.Null then 0 else x.GetRawText().Length - let internal (|EventLen|) (x : #IEventData<_>) = let BlobLen bytes, BlobLen metaBytes = x.Data, x.Meta in bytes + metaBytes + 80 - let internal (|BatchLen|) = Seq.sumBy (|EventLen|) + let internal eventLen (x: #IEventData<_>) = let BlobLen bytes, BlobLen metaBytes = x.Data, x.Meta in bytes + metaBytes + 80 + let internal batchLen = Seq.sumBy eventLen let internal (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function | :? Serilog.Events.ScalarValue as x -> Some x.Value | _ -> None @@ -479,12 +479,12 @@ module internal Sync = let private logged (container, stream) (maxEventsInTip, maxStringifyLen) (exp : SyncExp, req : Tip) (log : ILogger) : Async = async { let! t, (ru, result) = run (container, stream) (maxEventsInTip, maxStringifyLen) (exp, req) |> Stopwatch.Time - let Log.BatchLen bytes, count = Enum.Events req, req.e.Length + let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug + let count, bytes = req.e.Length, if verbose then Enum.Events req |> Log.batchLen else 0 let log = let inline mkMetric ru : Log.Measurement = { database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = bytes; count = count; ru = ru } let inline propConflict log = log |> Log.prop "conflict" true |> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in req.e -> x.c })) - let verbose = log.IsEnabled Events.LogEventLevel.Debug if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log |> match exp with | SyncExp.Etag et -> Log.prop "expectedEtag" et @@ -613,7 +613,7 @@ module internal Tip = (log 0 0 Log.Metric.TipNotFound).Information("EqxCosmos {action:l} {stream} {res} {ms}ms rc={ru}", "Tip", stream, 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found tip -> let log = - let Log.BatchLen bytes, count = Enum.Unfolds tip.u, tip.u.Length + let count, bytes = tip.u.Length, if verbose then Enum.Unfolds tip.u |> Log.batchLen else 0 log bytes count Log.Metric.Tip let log = if verbose then log |> Log.propDataUnfolds tip.u else log let log = match maybePos with Some p -> log |> Log.propStartPos p |> Log.propStartEtag p | None -> log @@ -642,7 +642,7 @@ module internal Query = if query.HasMoreResults then yield! loop (i + 1) } // earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it - use __ = query // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable + use _ = query // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable loop 0 let private mkQuery (log : ILogger) (container : Container, stream : string) includeTip maxItems (direction : Direction, minIndex, maxIndex) : FeedIterator = let order = if direction = Direction.Forward then "ASC" else "DESC" @@ -676,10 +676,10 @@ module internal Query = Enum.Events(b, ?minIndex = minIndex, ?maxIndex = maxIndex) |> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id let events = batches |> Seq.collect unwrapBatch |> Array.ofSeq - let Log.BatchLen bytes, count = events, events.Length + let verbose = log.IsEnabled Events.LogEventLevel.Debug + let count, bytes = events.Length, if verbose then events |> Log.batchLen else 0 let reqMetric : Log.Measurement = { database = container.Database.Id; container = container.Id; stream = streamName; interval = t; bytes = bytes; count = count; ru = ru } let log = let evt = Log.Metric.QueryResponse (direction, reqMetric) in log |> Log.event evt - let verbose = log.IsEnabled Events.LogEventLevel.Debug let log = if verbose then log |> Log.propEvents events else log let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i }) (log|> Log.prop "bytes" bytes @@ -691,7 +691,8 @@ module internal Query = events, maybePosition, ru let private logQuery direction queryMaxItems (container : Container, streamName) interval (responsesCount, events : ITimelineEvent[]) n (ru : float) (log : ILogger) = - let Log.BatchLen bytes, count = events, events.Length + let verbose = log.IsEnabled Events.LogEventLevel.Debug + let count, bytes = events.Length, if verbose then events |> Log.batchLen else 0 let reqMetric : Log.Measurement = { database = container.Database.Id; container = container.Id; stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let evt = Log.Metric.Query (direction, responsesCount, reqMetric) let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB" @@ -703,7 +704,7 @@ module internal Query = let mutable used, dropped = 0, 0 let mutable found = false for x in xs do - let (Log.EventLen bytes) = x + let bytes = Log.eventLen x if found then dropped <- dropped + bytes else used <- used + bytes if x.Index = stopIndex then found <- true