From a0e4bb29f0b73125624d3e26b79ed4e5d707e75c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 7 Jun 2024 16:17:02 +0100 Subject: [PATCH] Sorting, C+C, D+M, unsorted --- .../CosmosStoreSerialization.fs | 13 +- tools/Equinox.Tool/Program.fs | 143 +++++++++++------- 2 files changed, 101 insertions(+), 55 deletions(-) diff --git a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs index 1dca4ecdd..225f12fc9 100644 --- a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs +++ b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs @@ -12,11 +12,13 @@ module private Deflate = compressor.Flush() // Could `Close`, but not required output.ToArray() - let inflate (compressedBytes: byte[]) = + let inflateTo output (compressedBytes: byte[]) = let input = new MemoryStream(compressedBytes) let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true) - let output = new MemoryStream() decompressor.CopyTo(output) + let inflate compressedBytes = + let output = new MemoryStream() + compressedBytes |> inflateTo output output.ToArray() module JsonElement = @@ -26,10 +28,13 @@ module JsonElement = // Avoid introduction of HTML escaping for things like quotes etc (Options.Default uses Options.Create(), which defaults to unsafeRelaxedJsonEscaping=true) let private optionsNoEscaping = JsonSerializerOptions(Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping) - let private toUtf8Bytes (value : JsonElement) = JsonSerializer.SerializeToUtf8Bytes(value, options = optionsNoEscaping) - let deflate (value : JsonElement) : JsonElement = + let private toUtf8Bytes (value: JsonElement) = JsonSerializer.SerializeToUtf8Bytes(value, options = optionsNoEscaping) + let deflate (value: JsonElement): JsonElement = if value.ValueKind = JsonValueKind.Null then value else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement + let tryInflateTo ms (x: JsonElement) = + if x.ValueKind <> JsonValueKind.String then false + else x.GetBytesFromBase64() |> Deflate.inflateTo ms; true type CosmosJsonSerializer(options : JsonSerializerOptions) = inherit Microsoft.Azure.Cosmos.CosmosSerializer() diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index d88bf2508..c56f1f238 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -167,7 +167,9 @@ and [] TopParameters = | [] CategoryName of string | [] CategoryLike of string | [] Streams - | [] Limit of int + | [] TsOrder + | [] Limit of int + | [] Sort of Order | [] Cosmos of ParseResults interface IArgParserTemplate with member a.Usage = a |> function @@ -175,8 +177,11 @@ and [] TopParameters = | CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`." | CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`." | Streams -> "Stream level stats" - | Limit _ -> "Number of items to limit output to" + | TsOrder -> "Retrieve data in `_ts` ORDER (generally has significant RU impact). Default: Use continuation tokens" + | Sort _ -> "Sort order for results" + | Limit _ -> "Number of categories to limit output to (Streams limit is 10x the category limit). Default: 100" | Cosmos _ -> "Parameters for CosmosDB." +and Order = Name | Items | Events | Unfolds | Size | EventSize | UnfoldSize | InflateSize | CorrCauseSize and TopArguments(p: ParseResults) = member val Criteria = match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with @@ -188,8 +193,11 @@ and TopArguments(p: ParseResults) = | None, None, None -> Criteria.Unfiltered | None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive" member val CosmosArgs = p.GetResult TopParameters.Cosmos |> Store.Cosmos.Arguments - member val StreamLevel = p.Contains TopParameters.Streams - member val Count = p.GetResult(TopParameters.Limit, 100) + member val StreamLevel = p.Contains Streams + member val Count = p.GetResult(Limit, 100) + member val TsOrder = p.Contains TsOrder + member val Order = p.GetResult(Sort, Order.Size) + member x.StreamCount = p.GetResult(Limit, x.Count * 10) member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with | Store.Config.Cosmos (cc, _, _) -> cc.Container | _ -> failwith "Top requires Cosmos" @@ -406,7 +414,7 @@ module CosmosQuery = | [||] -> "1=1" | [| x |] -> x |> exists | xs -> String.Join(" AND ", xs) |> exists - $"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter} ORDER BY c.i" + $"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter}" let private queryDef (a: QueryArguments) = let sql = composeSql a Log.Information("Querying {mode}: {q}", a.Mode, sql) @@ -451,17 +459,52 @@ module CosmosTop = open Equinox.CosmosStore.Linq.Internal open FSharp.Control - - let cosmosTimeStamp (x: System.Text.Json.JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds - let tryParseEquinoxBatch (x: System.Text.Json.JsonElement) = - let tryProp (id: string): ValueOption = + open System.Text.Json + module private Parser = + let scratch = new System.IO.MemoryStream() + let inline utf8Size (x: JsonElement) = + scratch.Position <- 0L + JsonSerializer.Serialize(scratch, x) + scratch.Position + let inline inflatedUtf8Size x = + scratch.Position <- 0L + if Equinox.CosmosStore.Core.JsonElement.tryInflateTo scratch x then scratch.Position + else utf8Size x + let inline tryProp (x: JsonElement) (id: string): ValueOption = let mutable p = Unchecked.defaultof<_> if x.TryGetProperty(id, &p) then ValueSome p else ValueNone - match tryProp "p" with - | ValueSome (je: System.Text.Json.JsonElement) when je.ValueKind = System.Text.Json.JsonValueKind.String -> - ValueSome struct (je.GetString() |> FsCodec.StreamName.parse, tryProp "e", tryProp "u") - | _ -> ValueNone - + // using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about + let inline stringLen x = match x with ValueSome (x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString().Length | _ -> 0 + let _e = Unchecked.defaultof // Or Unfold - both share field names + let inline ciSize (x: JsonElement) = + (struct (0, 0L), x.EnumerateArray()) + ||> Seq.fold (fun struct (c, i) x -> + let inline infSize x = match x with ValueSome x -> inflatedUtf8Size x | ValueNone -> 0 + struct (c + (tryProp x (nameof _e.correlationId) |> stringLen) + (tryProp x (nameof _e.causationId) |> stringLen), + i + (tryProp x (nameof _e.d) |> infSize) + (tryProp x (nameof _e.m) |> infSize))) + let _t = Unchecked.defaultof + let inline tryEquinoxStreamName x = + match tryProp x (nameof _t.p) with + | ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String -> + je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome + | _ -> ValueNone + let private tryParseEventOrUnfold = function + | ValueNone -> struct (0, 0L, struct (0, 0L)) + | ValueSome (x: JsonElement) -> x.GetArrayLength(), utf8Size x, ciSize x + [] + type Stat = + { key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64; cBytes: int64; iBytes: int64 } + member x.Merge y = + { key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds; bytes = x.bytes + y.bytes + eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes; cBytes = x.cBytes + y.cBytes; iBytes = x.iBytes + y.iBytes } + override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key + override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false + static Create(key, x: JsonElement) = + let struct (e, eb, struct (ec, ei)) = tryProp x (nameof _t.e) |> tryParseEventOrUnfold + let struct (u, ub, struct (uc, ui)) = tryProp x (nameof _t.u) |> tryParseEventOrUnfold + { key = key; count = 1; events = e; unfolds = u + bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui } + let [] OrderByTs = " ORDER BY c._ts" let private composeSql (a: TopArguments) = let partitionKeyCriteria = match a.Criteria with @@ -469,60 +512,58 @@ module CosmosTop = | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\"" | Criteria.CatLike pat -> $"c.p LIKE \"{pat}\"" | Criteria.Unfiltered -> "1=1" - $"SELECT * FROM c WHERE {partitionKeyCriteria}" - let arrayLen = function ValueNone -> 0 | ValueSome (x: System.Text.Json.JsonElement) -> x.GetArrayLength() - let scratch = new System.IO.MemoryStream() - let utf8Size (x: System.Text.Json.JsonElement) = - scratch.Position <- 0L - System.Text.Json.JsonSerializer.Serialize(scratch, x) - scratch.Position - [] - type Stat = - { key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64 } - static member Create(key, d: System.Text.Json.JsonElement, e: System.Text.Json.JsonElement voption, u: System.Text.Json.JsonElement voption) = - let eb = match e with ValueSome x -> utf8Size x | ValueNone -> 0 - let ub = match u with ValueSome x -> utf8Size x | ValueNone -> 0 - { key = key; count = 1; events = arrayLen e; unfolds = arrayLen u - bytes = utf8Size d; eBytes = eb; uBytes = ub } - member x.Merge y = - { key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds - bytes = x.bytes + y.bytes; eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes } - override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key - override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false + $"SELECT * FROM c WHERE {partitionKeyCriteria}{if a.TsOrder then OrderByTs else null}" + let inline cosmosTimeStamp (x: JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds let run (a: TopArguments) = task { let sw = System.Diagnostics.Stopwatch.StartNew() let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet() let mutable accI, accE, accU, accRus, accRds, accOds, accBytes = 0L, 0L, 0L, 0., 0L, 0L, 0L let s = System.Collections.Generic.HashSet() - let categoryName = FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn - let g = if a.StreamLevel then FsCodec.StreamName.toString else categoryName + let categoryName = FsCodec.StreamName.Internal.trust >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn + let group = if a.StreamLevel then id else categoryName try for rtt, rc, items, rdc, rds, ods in a.Execute(composeSql a) |> Query.enum__ do - let mutable pageI, pageE, pageU, pageB, newestTs = 0, 0, 0, 0L, DateTime.MinValue + let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew() for x in items do newestTs <- max newestTs (cosmosTimeStamp x) - match tryParseEquinoxBatch x with + match Parser.tryEquinoxStreamName x with | ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}" - | ValueSome (sn, e, u) -> - if pageStreams.Add sn then accStreams.Add sn |> ignore - let x = Stat.Create(g sn, x, e, u) + | ValueSome sn -> + if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore + let x = Parser.Stat.Create(group sn, x) let mutable v = Unchecked.defaultof<_> if s.TryGetValue(x, &v) then s.Remove x |> ignore; s.Add(v.Merge x) |> ignore else s.Add x |> ignore - pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes - Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}", - rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, newestTs - DateTime.UtcNow) + pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes + Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}", + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, newestTs - DateTime.UtcNow) pageStreams.Clear() accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB finally - let accCats = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count - Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u {tmib:N1}MiB Read {rmib:N1}>{omib:N1} {ru:N2}RU {s:N1}s", - accI, accCats, accStreams.Count, accE, accU, miB accBytes, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds) - - for x in s |> Seq.sortByDescending _.bytes |> Seq.truncate a.Count do - Log.Information("{key,-20}:{count,7}i {mib,6:N1}MiB E{events,7} {emib,7:N1} U{unfolds,7} {umib,6:N1}", - x.key, x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes) } + let accCats = (if a.StreamLevel then s |> Seq.map _.key else accStreams) |> Seq.map group |> System.Collections.Generic.HashSet |> _.Count + let accStreams = if a.StreamLevel then s.Count else accStreams.Count + let iBytes, cBytes = s |> Seq.sumBy _.iBytes, s |> Seq.sumBy _.cBytes + let giB x = miB x / 1024. + Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB {ru:N2}RU {s:N1}s", + accI, accCats, accStreams, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accRus, sw.Elapsed.TotalSeconds) + let sort: Parser.Stat seq -> Parser.Stat seq = a.Order |> function + | Order.Name -> Seq.sortBy _.key + | Order.Size -> Seq.sortByDescending _.bytes + | Order.Items -> Seq.sortByDescending _.count + | Order.Events -> Seq.sortByDescending _.events + | Order.Unfolds -> Seq.sortByDescending _.unfolds + | Order.EventSize -> Seq.sortByDescending _.eBytes + | Order.UnfoldSize -> Seq.sortByDescending _.uBytes + | Order.InflateSize -> Seq.sortByDescending _.iBytes + | Order.CorrCauseSize -> Seq.sortByDescending _.cBytes + let render (x: Parser.Stat) = + Log.Information("{count,7}i {tm,6:N2}MiB E{events,7} {em,7:N1} U{unfolds,7} {um,6:N1} D+M{dm,6:N1} C+C{cm,5:N1} {key}", + x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key) + if a.StreamLevel then + let collapsed = s |> Seq.groupBy (_.key >> categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat }) + sort collapsed |> Seq.truncate a.Count |> Seq.iter render + sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render } module DynamoInit =