diff --git a/CHANGELOG.md b/CHANGELOG.md
index b0cbe982f..2950e2550 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,14 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]
+### Added
+### Changed
+### Removed
+### Fixed
+
+
+## 4.1.0 - 2025
+
### Added
- `Equinox.CosmosStore`: Roundtrip `D` and `M` encoding values as per `DynamoStore`, enabling more extensive control of compression [#472](https://github.com/jet/equinox/pull/472)
@@ -17,11 +25,18 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.CosmosStore`: Support Ingesting unfolds [#460](https://github.com/jet/equinox/pull/460)
- `Equinox.CosmosStore.EventsContext.Sync`: Support syncing of unfolds [#460](https://github.com/jet/equinox/pull/460)
- `eqx stats`: `-O`, `-N` flags extract oldest and newest `_ts` within a store [#459](https://github.com/jet/equinox/pull/459)
+- `eqx stats`: `-U` flag to count streams with unfolds and total number thereof; `-I` alias relabel Documents as Items [#464](https://github.com/jet/equinox/pull/464)
+- `eqx stats`: `-I` flag; relabel Documents as Items, retaining existing `-D` flag [#464](https://github.com/jet/equinox/pull/464)
- `eqx`: `-Q` flag omits timestamps from console output logging [#459](https://github.com/jet/equinox/pull/459)
+- `Equinox.CosmosStore.Linq`: Add LINQ querying support for Indexed `u`nfolds (`AccessStrategy.Custom`+`CosmosStoreCategory.shouldCompress`) [#450](https://github.com/jet/equinox/pull/450)
+- `eqx dump`, `eqx query`: `-sl` Support for specifying streams to dump via a [CosmosDB `LIKE` expression](https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/keywords#like) [#450](https://github.com/jet/equinox/pull/450)
+- `eqx dump`: `-Q` strips intervals, regularizes snapshots, logs stream names [#450](https://github.com/jet/equinox/pull/450)
+- `eqx top`: Support for analyzing space usage for event and view containers by category and/or stream [#450](https://github.com/jet/equinox/pull/450)
+- `eqx destroy`: Support for deleting the items(documents) underlying a category/stream/arbitrary `WHERE` clause [#450](https://github.com/jet/equinox/pull/450)
### Changed
-- `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.0, 5.0.0)`] [#448](https://github.com/jet/equinox/pull/448)
+- `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.3, 5.0.0)`] [#450](https://github.com/jet/equinox/pull/450)
- `Equinox.CosmosStore`: Update `System.Text.Json` dep to `6.0.10` per [CVE-2024-43485](https://github.com/advisories/GHSA-8g4q-xg66-9fp4) [#470](https://github.com/jet/equinox/pull/470)
- `Equinox.CosmosStore`: Minimum `Microsoft.Azure.Cosmos` requirement updated to `3.43.0` to avail of integrated `System.Text.Json` support [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore.CosmosStoreConnector`: Removed mandatory `requestTimeout` argument [#467](https://github.com/jet/equinox/pull/467)
@@ -790,7 +805,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
(For information pertaining to earlier releases, see release notes in https://github.com/jet/equinox/releases and/or can someone please add it!)
-[Unreleased]: https://github.com/jet/equinox/compare/4.0.4...HEAD
+[Unreleased]: https://github.com/jet/equinox/compare/4.1.0...HEAD
+[4.1.0]: https://github.com/jet/equinox/compare/4.0.4...4.1.0
[4.0.4]: https://github.com/jet/equinox/compare/4.0.3...4.0.4
[4.0.3]: https://github.com/jet/equinox/compare/4.0.2...4.0.3
[4.0.2]: https://github.com/jet/equinox/compare/4.0.0...4.0.2
diff --git a/README.md b/README.md
index 0a70f4e82..c75e7ff13 100644
--- a/README.md
+++ b/README.md
@@ -170,7 +170,7 @@ The components within this repository are delivered as multi-targeted Nuget pack
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`)
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`)
-- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.1`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
+- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox` v `4.0.3`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.1`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
- `Equinox.DynamoStore` [![DynamoStore NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.svg)](https://www.nuget.org/packages/Equinox.DynamoStore/): Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after `Equinox.CosmosStore`. ([depends](https://www.fuget.org/packages/Equinox.DynamoStore) on `Equinox`, `FSharp.AWS.DynamoDB` >= `0.12.0-beta`, `FSharp.Control.TaskSeq`)
- `Equinox.DynamoStore.Prometheus` [![DynamoStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.DynamoStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.DynamoStore`, `prometheus-net >= 3.6.0`)
@@ -390,7 +390,7 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
# use a wild card (LIKE) for the stream name
eqx query -cl '$Us%' -un Snapshotted cosmos -d db -c $EQUINOX_COSMOS_VIEWS -b 100000
- # > Querying Default: SELECT c.u, c.p, c._etag FROM c WHERE c.p LIKE "$Us%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "Snapshotted") {}
+ # > Querying Default: SELECT c.p, c._etag, c.u[0].d FROM c WHERE c.p LIKE "$Us%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "Snapshotted") {}
# > Page 7166s, 7166u, 0e 320.58RU 3.9s {}
# > Page 1608s, 1608u, 0e 68.59RU 0.9s {}
# > TOTALS 1c, 8774s, 389.17RU 4.7s {}
@@ -403,7 +403,7 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
# add criteria filtering based on an Uncompressed Unfold
eqx query -cn '$User' -un EmailIndex -uc 'u.d.email = "a@b.com"' cosmos -d db -c $EQUINOX_COSMOS_VIEWS -b 100000
- # > Querying Default: SELECT c.u, c.p, c._etag FROM c WHERE c.p LIKE "$User-%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "EmailIndex" AND u.d.email = "a@b.com") {}
+ # > Querying Default: SELECT c.p, c._etag, c.u[0].d FROM c WHERE c.p LIKE "$User-%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "EmailIndex" AND u.d.email = "a@b.com") {}
# > Page 0s, 0u, 0e 2.8RU 0.7s {}
# > TOTALS 0c, 0s, 2.80RU 0.7s {} # 👈 only 2.8RU if nothing is returned
@@ -430,6 +430,78 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
# > Page 2903s, 601u, 3188e 107.53RU 3.1s 4.0MiB age 0004.05:24:51 {}
# > Page 2638s, 316u, 3019e 93.09RU 2.5s 3.4MiB age 0000.05:08:38 {}
# > TOTALS 11c, 206,356s, 7,886.75RU R/W 290.4/290.4MiB 225.3s {}
+
+ # Prepare a breakdown of which categories are using the most capacity within the store
+ eqx -Q top cosmos -d db -c $EQUINOX_COSMOS_CONTAINER
+ # Page 3276>3276i 3276s 0e 3991u 4.00>4.00<4.22MiB 103.74RU 3.5s D+M 5.1 C+C 0.00 201ms age 0000.00:33:13 {}
+ # Page 3177>3177i 3177s 0e 4593u 4.00>4.01<4.20MiB 105.22RU 3.2s D+M 4.7 C+C 0.00 146ms age 0000.02:23:48 {}
+ # Page 2708>2708i 2708s 0e 5044u 4.00>4.00<4.19MiB 105.76RU 3.4s D+M 4.5 C+C 0.00 84ms age 0002.23:10:55 {}
+ ...
+ # Page 4334>4334i 4334s 0e 5038u 4.00>4.00<4.19MiB 112.59RU 2.9s D+M 4.2 C+C 0.00 109ms age 0000.00:00:59 {}
+ # Page 1637>1637i 1637s 0e 2939u 2.40>2.41<2.52MiB 64.12RU 1.7s D+M 2.5 C+C 0.00 39ms age 0000.00:18:03 {}
+ # TOTALS 47,200i 9c 47,200s 0e 79,262u read 0.1GiB output 0.1GiB JSON 0.1GiB D+M(inflated) 0.1GiB C+C 0.00MiB Parse 1.516s Total 1,750.73RU 54.2s {}
+ # 24064i 40.75MiB E 0 0.0 U 48128 33.6 D+M 35.0 C+C 0.0 $Friend {}
+ # 6372i 13.18MiB E 0 0.0 U 12744 11.4 D+M 23.9 C+C 0.0 $Tenant {}
+ # 6374i 5.41MiB E 0 0.0 U 6374 3.6 D+M 5.4 C+C 0.0 $Role0 {}
+ # 5992i 5.09MiB E 0 0.0 U 5992 3.4 D+M 5.1 C+C 0.0 $Role {}
+ # 1574i 1.95MiB E 0 0.0 U 1574 1.5 D+M 2.0 C+C 0.0 $Permission {}
+ # 1575i 1.79MiB E 0 0.0 U 3150 1.3 D+M 1.2 C+C 0.0 $User {}
+ # 445i 0.51MiB E 0 0.0 U 483 0.4 D+M 0.8 C+C 0.0 $Invoice3 {}
+ # 410i 0.46MiB E 0 0.0 U 423 0.3 D+M 0.8 C+C 0.0 $Invoice2 {}
+ # 394i 0.44MiB E 0 0.0 U 394 0.3 D+M 0.7 C+C 0.0 $Invoice {}
+
+ # Drill into the Friend data (different test data to preceding article)
+ eqx top -cn '$Friend' cosmos -d db -c $EQUINOX_COSMOS_CONTAINER
+ # Page 4787>4787i 4787s 0e 4787u 4.00>4.00<4.19MiB 218.54RU 3.6s D+M 4.5 C+C 0.00 259ms age 0013.22:52:15 {}
+ # Page 4955>4955i 4955s 0e 4955u 4.00>4.00<4.19MiB 200.20RU 3.2s D+M 4.1 C+C 0.00 202ms age 0013.22:52:18 {}
+ # Page 4715>4715i 4715s 0e 4715u 4.00>4.00<4.21MiB 201.26RU 3.2s D+M 4.4 C+C 0.00 145ms age 0013.22:52:22 {}
+ # Page 4884>4884i 4884s 0e 4884u 4.00>4.00<4.20MiB 198.97RU 3.2s D+M 4.1 C+C 0.00 95ms age 0013.22:52:31 {}
+ # Page 4620>4620i 4620s 0e 4620u 4.00>4.00<4.20MiB 194.76RU 3.0s D+M 4.7 C+C 0.00 140ms age 0013.22:52:28 {}
+ # Page 4840>4840i 4840s 0e 4840u 4.00>4.00<4.19MiB 198.43RU 3.2s D+M 4.2 C+C 0.00 136ms age 0013.22:52:34 {}
+ # Page 4791>4791i 4791s 0e 4791u 4.00>4.00<4.21MiB 200.20RU 3.0s D+M 4.2 C+C 0.00 137ms age 0014.02:23:24 {}
+ # Page 3906>3906i 3906s 0e 3906u 3.01>3.02<3.15MiB 158.28RU 2.6s D+M 2.9 C+C 0.00 142ms age 0013.23:13:51 {}
+ # TOTALS 37,498i 1c 37,498s 0e 37,498u read 0.0GiB output 0.0GiB JSON 0.0GiB D+M(inflated) 0.0GiB C+C 0.00MiB Parse 1.264s Total 1,570.64RU 30.0s {}
+ # 37498i 32.55MiB E 0 0.1 U 37498 21.7 D+M 33.2 C+C 0.0 $Friend {}
+
+ # DRY RUN of deleting (note no `-f` supplied)
+ eqx destroy -cn '$Friend' cosmos -d db -c $EQUINOX_COSMOS_CONTAINER
+ # W Dry-run of deleting items based on SELECT c.p, c.id, ARRAYLENGTH(c.e) AS es, ARRAYLENGTH(c.u) AS us FROM c WHERE c.p LIKE "$Friend%" {}
+ # I Page 9999> 9999i 9999s 0e 9999u 8.21>0.76 415.07RRU 1.4s 0.00WRU/s 0.0s {}
+ # I Page 9999> 9999i 9999s 0e 9999u 8.48>0.76 404.70RRU 0.8s 0.00WRU/s 0.0s {}
+ # I Page 9999> 9999i 9999s 0e 9999u 8.32>0.76 395.36RRU 1.1s 0.00WRU/s 0.0s {}
+ # I Page 7501> 7501i 7501s 0e 7501u 6.01>0.57 299.60RRU 1.0s 0.00WRU/s 0.0s {}
+ # I TOTALS 37,498i 1c 37,498s 0e 37,498u read 31.0MiB output 2.9MiB 1,514.73RRU Avg 0.00WRU/s Delete 0.00WRU Total 7.8s {}
+
+ # Whack them (note the `--force` supplied)
+ eqx destroy -cn '$Friend' --force cosmos -d db -c $EQUINOX_COSMOS_CONTAINER
+ # W DESTROYING all Items WHERE c.p LIKE "$ResourceRole%" {}
+ # I .. Deleted 6347i 6347s 0e 6347u 1,671.52WRU/s 30.0s {}
+ # I Page 9999> 9999i 9999s 0e 9999u 8.21>0.76 415.17RRU 1.2s 1,678.54WRU/s 47.2s {}
+ # I .. Deleted 6363i 6363s 0e 6363u 1,703.29WRU/s 30.0s {}
+ # I Page 9999> 9999i 9999s 0e 9999u 8.48>0.76 404.70RRU 1.1s 1,685.49WRU/s 47.8s {}
+ # I .. Deleted 6001i 6001s 0e 6001u 1,571.94WRU/s 30.0s {}
+ # I Page 9999> 9999i 9999s 0e 9999u 8.32>0.76 395.36RRU 1.0s 1,582.18WRU/s 50.1s {}
+ ^C
+
+ # Get impatient; up the concurrency (-w 192) from the default 32 (note the `--force` supplied)
+ eqx destroy -cn '$Friend' --force -w 192 cosmos -d db -c $EQUINOX_COSMOS_CONTAINER
+ # W DESTROYING all Items WHERE c.p LIKE "$ResourceRole%" {}
+ # I Page 3946> 3946i 3946s 0e 3946u 3.05>0.30 176.23RRU 0.8s 5,107.71WRU/s 6.1s {}
+ # I TOTALS 3,946i 1c 3,946s 0e 3,946u read 3.0MiB output 0.3MiB 176.23RRU Avg 3,058.48WRU/s Delete 31,360.10WRU Total 10.3s {}
+
+ # Analyze the largest streams in the '$Permission' category
+ eqx top -S -cl '$Perm%' cosmos -d db -c $EQUINOX_COSMOS_CONTAINER
+ # I Page 254> 254i 254s 0e 254u 4.33>4.33<4.65MiB 349.76RU 3.9s D+M 8.2 C+C 0.00 105ms age 0013.23:34:02 {}
+ # I Page 1671>1671i 1671s 0e 1671u 2.39>2.40<2.54MiB 91.57RU 2.1s D+M 2.9 C+C 0.00 99ms age 0013.23:34:07 {}
+ # I TOTALS 1,925i 1,925c 1,925s 0e 1,925u read 0.0GiB output 0.0GiB JSON 0.0GiB D+M(inflated) 0.0GiB C+C 0.00MiB Parse 0.207s Total 441.33RU 9.4s {}
+ # I 1925i 7.19MiB E 0 0.0 U 1925 6.6 D+M 11.1 C+C 0.0 $Permission {}
+ # I 1i 1.75MiB E 0 0.0 U 1 1.8 D+M 3.1 C+C 0.0 $Permission-5292b7cd524d509bb969bd82abf39461 {}
+ # I 1i 1.38MiB E 0 0.0 U 1 1.4 D+M 2.5 C+C 0.0 $Permission-244b72fb0238595494b5cb3f9bd1abf7 {}
+ # I 1i 0.79MiB E 0 0.0 U 1 0.8 D+M 1.5 C+C 0.0 $Permission-68a13e8398b352c5b8e22ec18ab2bbb6 {}
+ # I 1i 0.57MiB E 0 0.0 U 1 0.6 D+M 1.1 C+C 0.0 $Permission-ea4d1f46014a5bf6bbd97d3ec5723266 {}
+ # I 1i 0.13MiB E 0 0.0 U 1 0.1 D+M 0.2 C+C 0.0 $Permission-65b58d132ff857bb81b08a5bb69732d2 {}
+ # I 1i 0.02MiB E 0 0.0 U 1 0.0 D+M 0.0 C+C 0.0 $Permission-a7bcc3370ad15ae68041745ca55166cf {}
+ # I 1i 0.02MiB E 0 0.0 U 1 0.0 D+M 0.0 C+C 0.0 $Permission-03032ccf597857d9aa9c64b10288af8c {}
```
6. Use `propulsion sync` tool to run a CosmosDB ChangeFeedProcessor
diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs
index 774c55e4b..9b358abba 100644
--- a/samples/Infrastructure/Services.fs
+++ b/samples/Infrastructure/Services.fs
@@ -17,10 +17,10 @@ type Store(store) =
MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial)
| Store.Config.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then CosmosStore.AccessStrategy.Snapshot snapshot else CosmosStore.AccessStrategy.Unoptimized
- CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.Encoding.EncodeTryCompressUtf8 codec, fold, initial, accessStrategy, caching)
+ CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.Encoder.CompressedUtf8 codec, fold, initial, accessStrategy, caching)
| Store.Config.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized
- DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
+ DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Encoder.Compressed codec, fold, initial, accessStrategy, caching)
| Store.Config.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then EventStoreDb.AccessStrategy.RollingSnapshots snapshot else EventStoreDb.AccessStrategy.Unoptimized
EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching)
diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj
index 53f6cbc39..cd2687392 100644
--- a/samples/Store/Domain/Domain.fsproj
+++ b/samples/Store/Domain/Domain.fsproj
@@ -19,7 +19,7 @@
-
+
diff --git a/samples/Store/Domain/Infrastructure.fs b/samples/Store/Domain/Infrastructure.fs
index 02eb0296e..940b30ec6 100644
--- a/samples/Store/Domain/Infrastructure.fs
+++ b/samples/Store/Domain/Infrastructure.fs
@@ -92,7 +92,7 @@ module EventCodec =
/// For CosmosStore - we encode to JsonElement as that's what the store talks
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
- FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Encoding.EncodeUncompressed
+ FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Encoder.Uncompressed
/// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs
index 4adbab382..a791db975 100644
--- a/samples/Store/Integration/LogIntegration.fs
+++ b/samples/Store/Integration/LogIntegration.fs
@@ -38,6 +38,7 @@ module EquinoxCosmosInterop =
| Log.Metric.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
| Log.Metric.QueryResponse (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
| Log.Metric.QueryResponse (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
+ | Log.Metric.Index m -> "CosmosLinq", m, None, m.ru
| Log.Metric.SyncSuccess m -> "CosmosSync200", m, None, m.ru
| Log.Metric.SyncConflict m -> "CosmosSync409", m, None, m.ru
| Log.Metric.SyncResync m -> "CosmosSyncResync", m, None, m.ru
diff --git a/samples/Tutorial/Favorites.fsx b/samples/Tutorial/Favorites.fsx
index 3702b6bf9..83315e1e0 100644
--- a/samples/Tutorial/Favorites.fsx
+++ b/samples/Tutorial/Favorites.fsx
@@ -74,7 +74,7 @@ let favesCa = fold favesCba removeBEffect
let _removeBAgainEffect = Decisions.remove "b" favesCa
//val _removeBAgainEffect : Event list = []
-// related streams are termed a Category; Each client will have it's own Stream.
+// related streams are termed a Category; Each client will have its own Stream.
let [] private CategoryName = "Favorites"
let clientAFavoritesStreamId = FsCodec.StreamId.gen id "ClientA"
diff --git a/samples/Tutorial/Infrastructure.fs b/samples/Tutorial/Infrastructure.fs
index ee99c8a9d..09fe36afa 100644
--- a/samples/Tutorial/Infrastructure.fs
+++ b/samples/Tutorial/Infrastructure.fs
@@ -23,7 +23,7 @@ module EventCodec =
/// For CosmosStore - we encode to JsonElement as that's what the store talks
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
- FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Encoding.EncodeUncompressed
+ FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Encoder.Uncompressed
/// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj
index a6d7334a3..2f5b35d35 100644
--- a/samples/Tutorial/Tutorial.fsproj
+++ b/samples/Tutorial/Tutorial.fsproj
@@ -28,7 +28,7 @@
-
+
diff --git a/src/Equinox.CosmosStore.Prometheus/CosmosStorePrometheus.fs b/src/Equinox.CosmosStore.Prometheus/CosmosStorePrometheus.fs
index 2808bbde3..7eb31da95 100644
--- a/src/Equinox.CosmosStore.Prometheus/CosmosStorePrometheus.fs
+++ b/src/Equinox.CosmosStore.Prometheus/CosmosStorePrometheus.fs
@@ -112,6 +112,7 @@ type LogSink(customTags: seq) =
| Op (Operation.Tip304, m) -> observeTip ("R", "query", "tip", "ok", "304") m
| Op (Operation.Query, m) -> observe ("R", "query", "query", "ok") m
| QueryRes (_direction, m) -> observeRes ("R", "query", "queryPage") m
+ | Op (Operation.Index, m) -> observe ("R", "linq", "query", "ok") m
| Op (Operation.Write, m) -> observe ("W", "transact", "sync", "ok") m
| Op (Operation.Conflict, m) -> observe ("W", "transact", "conflict", "conflict") m
| Op (Operation.Resync, m) -> observe ("W", "transact", "resync", "conflict") m
diff --git a/src/Equinox.CosmosStore.Prometheus/Equinox.CosmosStore.Prometheus.fsproj b/src/Equinox.CosmosStore.Prometheus/Equinox.CosmosStore.Prometheus.fsproj
index 057fa2ab3..28eacfe32 100644
--- a/src/Equinox.CosmosStore.Prometheus/Equinox.CosmosStore.Prometheus.fsproj
+++ b/src/Equinox.CosmosStore.Prometheus/Equinox.CosmosStore.Prometheus.fsproj
@@ -9,8 +9,10 @@
-
-
+
+
+
+
diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs
index abc42dd8f..05e51ec5a 100644
--- a/src/Equinox.CosmosStore/CosmosStore.fs
+++ b/src/Equinox.CosmosStore/CosmosStore.fs
@@ -14,13 +14,14 @@ type EncodedBody = (struct (int * JsonElement))
/// Interpretation of EncodedBody data is an external concern from the perspective of the Store
/// The idiomatic implementation of the encoding logic is FsCodec.SystemTextJson.Compression, in versions 3.1.0 or later
/// That implementation provides complete interop with encodings produced by Equinox.Cosmos/CosmosStore from V1 onwards, including integrated Deflate compression
-module internal EncodedBody =
+module EncodedBody =
let internal jsonRawText: EncodedBody -> string = ValueTuple.snd >> _.GetRawText()
let internal jsonUtf8Bytes = jsonRawText >> System.Text.Encoding.UTF8.GetByteCount
- let [] deflateEncoding = 1
- // prior to the addition of the `D` field in 4.1.0, the integrated compression support
- // was predicated entirely on a JSON String `d` value in the Unfold as implying it was UTF8->Deflate->Base64 encoded
- let parseUnfold = function struct (0, e: JsonElement) when e.ValueKind = JsonValueKind.String -> struct (deflateEncoding, e) | x -> x
+ let [] private deflateEncoding = 1
+ /// prior to the addition of the `D` field in 4.1.0, the integrated compression support
+ /// was predicated entirely on a JSON String `d` value in the Unfold as implying it was UTF8 -> Deflate -> Base64 encoded
+ let ofUnfoldBody struct (enc, data: JsonElement): EncodedBody =
+ if enc = 0 && data.ValueKind = JsonValueKind.String then (deflateEncoding, data) else (enc, data)
/// A single Domain Event from the array held in a Batch
[]
@@ -108,7 +109,7 @@ type Unfold =
[]
M: int }
member x.ToTimelineEvent(): ITimelineEvent =
- FsCodec.Core.TimelineEvent.Create(x.i, x.c, EncodedBody.parseUnfold (x.D, x.d), (x.M, x.m), Guid.Empty, null, null, x.t, isUnfold = true)
+ FsCodec.Core.TimelineEvent.Create(x.i, x.c, EncodedBody.ofUnfoldBody (x.D, x.d), (x.M, x.m), Guid.Empty, null, null, x.t, isUnfold = true)
// Arrays are not indexed by default. 1. enable filtering by `c`ase 2. index uncompressed fields within unfolds for filtering
static member internal IndexedPaths = [| "/u/[]/c/?"; "/u/[]/d/*" |]
@@ -228,6 +229,8 @@ module Log =
| Delete of Measurement
/// Trimmed the Tip
| Trim of Measurement
+ /// Queried via the Index; count=-1 -> aggregate operation
+ | Index of Measurement
let [] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption =
let mutable p = Unchecked.defaultof<_>
logEvent.Properties.TryGetValue(PropertyTag, &p) |> ignore
@@ -253,10 +256,10 @@ module Log =
f log
retryPolicy.Execute withLoggingContextWrapping
- let internal eventLen (x: #IEventData<_>) = EncodedBody.jsonUtf8Bytes x.Data + EncodedBody.jsonUtf8Bytes x.Meta + 80
+ let internal eventLen (x: #IEventData) = EncodedBody.jsonUtf8Bytes x.Data + EncodedBody.jsonUtf8Bytes x.Meta + 80
let internal batchLen = Seq.sumBy eventLen
[]
- type Operation = Tip | Tip404 | Tip304 | Query | Write | Resync | Conflict | Prune | Delete | Trim
+ type Operation = Tip | Tip404 | Tip304 | Query | Index | Write | Resync | Conflict | Prune | Delete | Trim
let (|Op|QueryRes|PruneRes|) = function
| Metric.Tip s -> Op (Operation.Tip, s)
| Metric.TipNotFound s -> Op (Operation.Tip404, s)
@@ -265,6 +268,8 @@ module Log =
| Metric.Query (_, _, s) -> Op (Operation.Query, s)
| Metric.QueryResponse (direction, s) -> QueryRes (direction, s)
+ | Metric.Index s -> Op (Operation.Index, s)
+
| Metric.SyncSuccess s -> Op (Operation.Write, s)
| Metric.SyncResync s -> Op (Operation.Resync, s)
| Metric.SyncConflict s -> Op (Operation.Conflict, s)
@@ -299,6 +304,7 @@ module Log =
let epoch = System.Diagnostics.Stopwatch.StartNew()
member val internal Tip = Counters() with get, set
member val internal Read = Counters() with get, set
+ member val internal Index = Counters() with get, set
member val internal Write = Counters() with get, set
member val internal Resync = Counters() with get, set
member val internal Conflict = Counters() with get, set
@@ -326,6 +332,7 @@ module Log =
epoch.Tip.Ingest m
| Op (Operation.Query, BucketMsRu m) -> epoch.Read.Ingest m
| QueryRes (_direction, _) -> ()
+ | Op (Operation.Index, BucketMsRu m) -> epoch.Index.Ingest m
| Op (Operation.Write, BucketMsRu m) -> epoch.Write.Ingest m
| Op (Operation.Conflict, BucketMsRu m) -> epoch.Conflict.Ingest m
| Op (Operation.Resync, BucketMsRu m) -> epoch.Resync.Ingest m
@@ -342,13 +349,14 @@ module Log =
let stats =
[|nameof res.Tip, res.Tip
nameof res.Read, res.Read
+ nameof res.Index, res.Index
nameof res.Write, res.Write
nameof res.Resync, res.Resync
nameof res.Conflict, res.Conflict
nameof res.Prune, res.Prune
nameof res.Delete, res.Delete
nameof res.Trim, res.Trim |]
- let isRead = function nameof res.Tip | nameof res.Read | nameof res.Prune -> true | _ -> false
+ let isRead = function nameof res.Tip | nameof res.Read | nameof res.Index | nameof res.Prune | nameof res.Resync -> true | _ -> false
let buckets = stats |> Seq.collect (fun (_n, stat) -> stat.Buckets) |> Seq.distinct |> Seq.sort |> Seq.toArray
if Array.isEmpty buckets then () else
diff --git a/src/Equinox.CosmosStore/CosmosStoreLinq.fs b/src/Equinox.CosmosStore/CosmosStoreLinq.fs
new file mode 100644
index 000000000..d713df497
--- /dev/null
+++ b/src/Equinox.CosmosStore/CosmosStoreLinq.fs
@@ -0,0 +1,269 @@
+namespace Equinox.CosmosStore.Linq
+
+open Equinox.Core
+open Equinox.CosmosStore.Core // Log, JsonCompressedBase64Converter
+open FSharp.Control // taskSeq
+open Serilog
+open System
+open System.ComponentModel
+open System.Linq
+open System.Linq.Expressions
+open System.Runtime.CompilerServices
+
+/// Generic Expression Tree manipulation helpers / Cosmos SDK LINQ support incompleteness workarounds
+type [] QueryExtensions =
+ static member Replace(find, replace) =
+ { new ExpressionVisitor() with
+ override _.Visit node =
+ if node = find then replace
+ else base.Visit node }
+ []
+ static member Replace(x: Expression, find, replace) = QueryExtensions.Replace(find, replace).Visit(x)
+ [] // https://stackoverflow.com/a/8829845/11635
+ static member Compose(selector: Expression>, projector: Expression>): Expression> =
+ let param = Expression.Parameter(typeof<'T>, "x")
+ let prop = selector.Body.Replace(selector.Parameters[0], param)
+ let body = projector.Body.Replace(projector.Parameters[0], prop)
+ Expression.Lambda>(body, param)
+ []
+ static member OrderBy(source: IQueryable<'T>, indexSelector: Expression>, keySelector: Expression>, descending) =
+ QueryExtensions.OrderByLambda<'T>(source, indexSelector.Compose keySelector, descending)
+ [] // https://stackoverflow.com/a/233505/11635
+ static member OrderByPropertyName(source: IQueryable<'T>, indexSelector: Expression>, propertyName: string, descending) =
+ let indexProperty = Expression.PropertyOrField(indexSelector.Body, propertyName)
+ let delegateType = typedefof>.MakeGenericType(typeof<'T>, indexProperty.Type)
+ let keySelector = Expression.Lambda(delegateType, indexProperty, indexSelector.Parameters[0])
+ QueryExtensions.OrderByLambda(source, keySelector, descending)
+ // NOTE not an extension method as OrderByPropertyName and OrderBy represent the as-strongly-typed-as-possible top level use cases
+ // NOTE no support for a `comparison` arg is warranted as CosmosDB only handles direct scalar prop expressions, https://stackoverflow.com/a/69268191/11635
+ static member OrderByLambda<'T>(source: IQueryable<'T>, keySelector: LambdaExpression, descending) =
+ let call = Expression.Call(
+ typeof,
+ (if descending then "OrderByDescending" else "OrderBy"),
+ [| typeof<'T>; keySelector.ReturnType |],
+ [| source.Expression; keySelector |])
+ source.Provider.CreateQuery<'T>(call) :?> IOrderedQueryable<'T>
+
+/// Predicate manipulation helpers
+type [] Predicate =
+ /// F# maps `fun` expressions to Expression trees, only when the target is a `member` arg
+ /// See https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/linq-to-sql for the list of supported constructs
+ static member Create<'T> expr: Expression> = expr
+ [] // https://stackoverflow.com/a/22569086/11635
+ static member And<'T>(l: Expression>, r: Expression>) =
+ let rBody = r.Body.Replace(r.Parameters[0], l.Parameters[0])
+ Expression.Lambda>(Expression.AndAlso(l.Body, rBody), l.Parameters)
+ [] // https://stackoverflow.com/a/22569086/11635
+ static member Or<'T>(l: Expression>, r: Expression>) =
+ let rBody = r.Body.Replace(r.Parameters[0], l.Parameters[0])
+ Expression.Lambda>(Expression.OrElse(l.Body, rBody), l.Parameters)
+ []
+ static member Where(source: IQueryable<'T>, indexSelector: Expression>, indexPredicate: Expression>): IQueryable<'T> =
+ source.Where(indexSelector.Compose indexPredicate)
+
+module Internal =
+ open Microsoft.Azure.Cosmos
+ open Microsoft.Azure.Cosmos.Linq
+ let inline miB x = float x / 1024. / 1024.
+ module Query =
+ /// Generates an `OFFSET skip LIMIT take` Cosmos SQL query
+ /// NOTE: such a query gets more expensive the more your Skip traverses, so use with care
+ /// NOTE: (continuation tokens are the key to more linear costs)
+ let offsetLimit (skip: int, take: int) (query: IQueryable<'T>) =
+ query.Skip(skip).Take(take)
+ let [] enum__ (iterator: FeedIterator<'T>) = taskSeq {
+ while iterator.HasMoreResults do
+ let! response = iterator.ReadNextAsync()
+ let m = response.Diagnostics.GetQueryMetrics().CumulativeMetrics
+ yield struct (response.Diagnostics.GetClientElapsedTime(), response.RequestCharge, response.Resource,
+ int m.RetrievedDocumentCount, int m.RetrievedDocumentSize, int m.OutputDocumentSize) }
+ let enum_<'T> (log: ILogger) (container: Container) (action: string) cat logLevel (iterator: FeedIterator<'T>) = taskSeq {
+ let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
+ use _ = iterator
+ let mutable responses, items, totalRtt, totalRu, totalRdc, totalRds, totalOds = 0, 0, TimeSpan.Zero, 0., 0, 0, 0
+ try for rtt, rc, response, rdc, rds, ods in enum__ iterator do
+ responses <- responses + 1
+ totalRdc <- totalRdc + rdc
+ totalRds <- totalRds + rds
+ totalOds <- totalOds + ods
+ totalRu <- totalRu + rc
+ totalRtt <- totalRtt + rtt
+ for item in response do
+ items <- items + 1
+ yield item
+ finally
+ let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp())
+ let log = if cat = null then log else
+ let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr
+ interval = interval; bytes = totalOds; count = items; ru = totalRu } in log |> Log.event evt
+ log.Write(logLevel, "EqxCosmos {action:l} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {lat:f0} ms",
+ action, items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) }
+ /// Runs a query that can be hydrated as 'T
+ let enum log container cat = enum_ log container "Index" cat Events.LogEventLevel.Information
+ let exec__<'R> (log: ILogger) (container: Container) cat logLevel (queryDefinition: QueryDefinition): TaskSeq<'R> =
+ if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.run {cat} {query}", cat, queryDefinition.QueryText)
+ container.GetItemQueryIterator<'R> queryDefinition |> enum_ log container "Query" cat logLevel
+ /// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc)
+ let enumAs<'T, 'P> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>): TaskSeq<'P> =
+ let queryDefinition = query.ToQueryDefinition()
+ exec__<'P> log container cat logLevel queryDefinition
+ /// Execute a query, hydrating as 'R
+ let exec<'R> (log: ILogger) (container: Container) logLevel (queryDefinition: QueryDefinition): TaskSeq<'R> =
+ exec__<'R> log container "%" logLevel queryDefinition
+ module AggregateOp =
+ /// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs
+ let [] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task {
+ let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
+ let! (rsp: Response<'R>) = run query
+ let res = rsp.Resource
+ let summary = render res
+ let m = rsp.Diagnostics.GetQueryMetrics().CumulativeMetrics
+ let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp())
+ let totalOds, totalRu = m.OutputDocumentSize, rsp.RequestCharge
+ let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr
+ interval = interval; bytes = int totalOds; count = -1; ru = totalRu } in log |> Log.event evt
+ log.Information("EqxCosmos {action:l} {cat} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {lat:f0} ms",
+ op, cat, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB totalOds, totalRu, interval.ElapsedMilliseconds)
+ return res }
+ /// Runs query.CountAsync, with instrumentation equivalent to what query provides
+ let countAsync (log: ILogger) container cat logLevel (query: IQueryable<'T>) ct =
+ if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText)
+ exec log container "count" cat query (_.CountAsync(ct)) id
+ module Scalar =
+ /// Generates a TOP 1 SQL query
+ let top1 (query: IQueryable<'T>) =
+ query.Take(1)
+ /// Handles a query that's expected to yield 0 or 1 result item
+ let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
+ let queryDefinition = (top1 query).ToQueryDefinition()
+ if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.tryScalar {cat} {query}", cat, queryDefinition.QueryText)
+ container.GetItemQueryIterator<'R> queryDefinition |> Query.enum_ log container "Scalar" cat logLevel |> TaskSeq.tryHead
+ type Projection<'T, 'M>(query, category, container, enum: IQueryable<'T> -> TaskSeq<'M>, count: IQueryable<'T> -> CancellationToken -> Task) =
+ static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M, logLevel) =
+ Projection<'T, 'M>(q, cat, c, Query.enumAs<'T, 'P> log c cat logLevel >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat logLevel)
+ member _.Enum: TaskSeq<'M> = query |> enum
+ member _.EnumPage(skip, take): TaskSeq<'M> = query |> Query.offsetLimit (skip, take) |> enum
+ member _.CountAsync: CancellationToken -> Task = query |> count
+ [] member val Query: IQueryable<'T> = query
+ [] member val Category: string = category
+ [] member val Container: Container = container
+
+// We want to generate a projection statement of the shape: VALUE {"sn": root["p"], "d": root["u"][0].["d"], "D": root["u"][0].["D"]}
+// However the Cosmos SDK does not support F# (or C#) records yet https://github.com/Azure/azure-cosmos-dotnet-v3/issues/3728
+// F#'s LINQ support cannot translate parameterless constructor invocations in a Lambda well;
+// the best native workaround without Expression Manipulation is/was https://stackoverflow.com/a/78206722/11635
+// In C#, you can generate an Expression that works with the Cosmos SDK via `.Select(x => new { sn = x.p, d = x.u[0].d, D = x.u[0].D })`
+// This hack is based on https://stackoverflow.com/a/73506241/11635
+type SnAndSnap() =
+ member val sn: FsCodec.StreamName = Unchecked.defaultof<_> with get, set
+ member val D: Nullable = Unchecked.defaultof<_> with get, set
+ member val d: System.Text.Json.JsonElement = Unchecked.defaultof<_> with get, set
+ static member CreateItemQueryLambda<'T, 'U>(
+ snExpression: Expression -> MemberExpression,
+ uExpression: Expression>,
+ formatExpression: Expression>>,
+ dataExpression: Expression>) =
+ let param = Expression.Parameter(typeof<'T>, "x")
+ let targetType = typeof
+ let snMember = targetType.GetMember(nameof Unchecked.defaultof.sn)[0]
+ let formatMember = targetType.GetMember(nameof Unchecked.defaultof.D)[0]
+ let dataMember = targetType.GetMember(nameof Unchecked.defaultof.d)[0]
+ Expression.Lambda>(
+ Expression.MemberInit(
+ Expression.New(targetType.GetConstructor [||]),
+ [| Expression.Bind(snMember, snExpression param) :> MemberBinding
+ Expression.Bind(formatMember, QueryExtensions.Compose(uExpression, formatExpression).Body.Replace(uExpression.Parameters[0], param))
+ Expression.Bind(dataMember, uExpression.Compose(dataExpression).Body.Replace(uExpression.Parameters[0], param)) |]),
+ [| param |])
+
+/// Represents a query projecting information values from an Index and/or Snapshots with a view to rendering the items and/or a count
+type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) =
+ member _.Enum: TaskSeq<'M> = inner.Enum
+ member _.EnumPage(skip, take): TaskSeq<'M> = inner.EnumPage(skip, take)
+ member _.CountAsync(ct: CancellationToken): Task = inner.CountAsync ct
+ member _.Count(): Async = inner.CountAsync |> Async.call
+ [] member val Inner = inner
+
+/// Helpers for Querying Indices and Projecting Snapshot data based on well-known aspects of Equinox.CosmosStore's storage schema
+module Index =
+
+ []
+ type Item<'I> =
+ { p: string
+ _etag: string
+ u: Unfold<'I> ResizeArray } // Arrays do not bind correctly in Cosmos LINQ
+ and [] Unfold<'I> =
+ { c: string
+ d: 'I // For an index, this is the uncompressed JSON data; we're generating a LINQ query using this field's type, 'I
+ []
+ data: System.Text.Json.JsonElement // The raw data representing the encoded snapshot
+ []
+ format: Nullable } // The (optional) encoding associated with that snapshot
+
+ let inline prefix categoryName = $"%s{categoryName}-"
+ /// The cheapest search basis; the categoryName is a prefix of the `p` partition field
+ /// Depending on how much more selective the caseName is, `byCaseName` may be a better choice
+ /// (but e.g. if the ration is 1:1 then no point having additional criteria)
+ let byCategoryNameOnly<'I> (container: Microsoft.Azure.Cosmos.Container) categoryName: IQueryable- > =
+ let prefix = prefix categoryName
+ container.GetItemLinqQueryable
- >().Where(fun d -> d.p.StartsWith(prefix))
+ // Searches based on the prefix of the `p` field, but also checking the `c` of the relevant unfold is correct
+ // A good idea if that'll be significantly cheaper due to better selectivity
+ let byCaseName<'I> (container: Microsoft.Azure.Cosmos.Container) categoryName caseName: IQueryable
- > =
+ let prefix = prefix categoryName
+ container.GetItemLinqQueryable
- >().Where(fun d -> d.p.StartsWith(prefix) && d.u[0].c = caseName)
+
+ /// Returns the StreamName (from the `p` field) for a 0/1 item query; only the TOP 1 item is returned
+ let tryGetStreamNameAsync log cat logLevel container (query: IQueryable
- >) ct =
+ Internal.Scalar.tryHeadAsync log cat logLevel container (query.Select(fun x -> x.p)) ct
+
+ /// Query the items, returning the Stream name and the Snapshot as a JsonElement (Decompressed if applicable)
+ let projectStreamNameAndSnapshot<'I> snapshotUnfoldExpression: Expression, SnAndSnap>> =
+ // a very ugly workaround for not being able to write query.Select
- ,Internal.SnAndSnap>(fun x -> { p = x.p; D = x.u[0].D; d = x.u[0].d })
+ let pExpression item = Expression.PropertyOrField(item, nameof Unchecked.defaultof
- >.p)
+ SnAndSnap.CreateItemQueryLambda
- , Unfold<'I>>(pExpression, snapshotUnfoldExpression, (fun x -> x.format), (fun x -> x.data))
+
+ let createSnAndSnapshotQuery<'I, 'M> log container cat logLevel (hydrate: SnAndSnap -> 'M) (query: IQueryable) =
+ Internal.Projection.Create(query, cat, container, log, hydrate, logLevel) |> Query
+
+/// Enables querying based on uncompressed Indexed values stored as secondary unfolds alongside the snapshot
+[]
+type IndexContext<'I>(container, categoryName, caseName, log, []?queryLogLevel) =
+
+ let queryLogLevel = defaultArg queryLogLevel Serilog.Events.LogEventLevel.Debug
+ member val Log = log
+ member val Description = $"{categoryName}/{caseName}" with get, set
+ member val Container = container
+
+ /// Helper to make F# consumption code more terse (the F# compiler generates Expression trees only when a function is passed to a `member`)
+ /// Example: `i.Predicate(fun e -> e.name = name)`
+ /// See https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/linq-to-sql for the list of supported constructs
+ member _.Predicate expr: Expression> = expr
+
+ /// Fetches a base Queryable that's filtered based on the `categoryName` and `caseName`
+ /// NOTE this is relatively expensive to compute a Count on, compared to `CategoryQueryable`
+ member _.ByCaseName(): IQueryable> =
+ Index.byCaseName<'I> container categoryName caseName
+
+ /// Fetches a base Queryable that's filtered only on the `categoryName`
+ member _.ByCategory(): IQueryable> =
+ Index.byCategoryNameOnly<'I> container categoryName
+
+ /// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
+ member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression, bool>>, ct, [] ?logLevel) =
+ let logLevel = defaultArg logLevel queryLogLevel
+ Index.tryGetStreamNameAsync x.Log container categoryName logLevel (x.ByCategory().Where criteria) ct
+
+ /// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
+ member x.TryGetStreamNameWhere(criteria: Expressions.Expression, bool>>): Async =
+ (fun ct -> x.TryGetStreamNameWhereAsync(criteria, ct)) |> Async.call
+
+ /// Query the items, grabbing the Stream name and the Snapshot; The StreamName and the (Decompressed if applicable) Snapshot are passed to `hydrate`
+ member x.QueryStreamNameAndSnapshot(
+ query: IQueryable>,
+ selectSnapshotUnfold: Expression, Index.Unfold<'I>>>,
+ hydrate: SnAndSnap -> 'M,
+ [] ?logLevel): Query =
+ let logLevel = defaultArg logLevel queryLogLevel
+ query.Select(Index.projectStreamNameAndSnapshot<'I> selectSnapshotUnfold)
+ |> Index.createSnAndSnapshotQuery x.Log container categoryName logLevel hydrate
diff --git a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj b/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
index e3848e9e5..fbb718170 100644
--- a/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
+++ b/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
@@ -13,11 +13,12 @@
+
-
+
diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs
index 1fcfea895..d73a04786 100644
--- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs
+++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs
@@ -42,9 +42,9 @@ module SequenceCheck =
| Add of {| value : int |}
interface TypeShape.UnionContract.IUnionContract
#if STORE_DYNAMO
- let codec = FsCodec.SystemTextJson.Codec.Create() |> FsCodec.Compression.EncodeTryCompress
+ let codec = FsCodec.SystemTextJson.Codec.Create() |> FsCodec.Encoder.Compressed
#else
- let codec = FsCodec.SystemTextJson.CodecJsonElement.Create() |> FsCodec.SystemTextJson.Encoding.EncodeTryCompress
+ let codec = FsCodec.SystemTextJson.CodecJsonElement.Create() |> FsCodec.SystemTextJson.Encoder.Compressed
#endif
module Fold =
diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs
index deecfddd0..32da9daaa 100644
--- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs
+++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs
@@ -10,7 +10,7 @@ open System
type TestEvents() =
static member private Create(i, ?eventType, ?json) =
- let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.Encoding.FromJsonElement
+ let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.Encoding.OfJsonElement
FsCodec.Core.EventData.Create
( sprintf "%s:%d" (defaultArg eventType "test_event") i,
enc (defaultArg json "{\"d\":\"d\"}"),
diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs
index cdf23059d..eefeba99a 100644
--- a/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs
+++ b/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs
@@ -33,6 +33,7 @@ module SerilogHelpers =
type EqxAct =
| Tip | TipNotFound | TipNotModified
| ResponseForward | ResponseBackward
+ | Index
| QueryForward | QueryBackward
| Append | Resync | Conflict
| PruneResponse | Delete | Trim | Prune
@@ -50,6 +51,7 @@ module SerilogHelpers =
| Metric.SyncAppend _ | Metric.SyncCalve _ -> EqxAct.Append
| Metric.SyncAppendConflict _ | Metric.SyncCalveConflict _ -> EqxAct.Conflict
#else
+ | Metric.Index _ -> EqxAct.Index
| Metric.SyncSuccess _ -> EqxAct.Append
| Metric.SyncResync _ -> EqxAct.Resync
| Metric.SyncConflict _ -> EqxAct.Conflict
@@ -75,6 +77,7 @@ module SerilogHelpers =
| Metric.SyncAppend s | Metric.SyncCalve s
| Metric.SyncCalveConflict s | Metric.SyncAppendConflict s -> Write s
#else
+ | Metric.Index s -> Response s // Stubbed out for now
| Metric.SyncSuccess s
| Metric.SyncConflict s -> Write s
| Metric.SyncResync s -> Resync s
diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs
index 88682f3ea..739f84304 100644
--- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs
+++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs
@@ -15,7 +15,7 @@ open Equinox.CosmosStore.Integration.CosmosFixtures
module Cart =
let fold, initial = Cart.Fold.fold, Cart.Fold.initial
#if STORE_DYNAMO
- let codec = Cart.Events.codec |> FsCodec.Compression.EncodeTryCompress
+ let codec = Cart.Events.codec |> FsCodec.Encoder.Compressed
#else
let codec = Cart.Events.codecJe
#endif
@@ -49,7 +49,7 @@ module ContactPreferences =
let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial
module ClientId = let gen (): ContactPreferences.ClientId = Guid.gen () |> Guid.toStringN |> ContactPreferences.ClientId
#if STORE_DYNAMO
- let codec = ContactPreferences.Events.codec |> FsCodec.Compression.EncodeTryCompress
+ let codec = ContactPreferences.Events.codec |> FsCodec.Encoder.Compressed
#else
let codec = ContactPreferences.Events.codecJe
#endif
diff --git a/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs b/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs
index 6c50476a0..4857c50f8 100644
--- a/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs
+++ b/tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs
@@ -1,7 +1,7 @@
// Prior to version v 4.1.0, CosmosStore owned:
// - compression of snapshots (and APIs controlling conditionally of that)
// - inflation of snapshots
-// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Compression v 3.1.0 and later
+// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Encod* v 3.1.0 and later
// These tests are a sanity check pinning the basic mechanisms that are now externalized; any more thorough tests should be maintained in FsCodec
// NOTE there is no strong dependency on FsCodec; CosmosStore is happy to roundtrip arbitrary pairs of D/d and M/m values
// NOTE prior to v 4.1.0, CosmosStore provided a System.Text.Json integration for Microsoft.Azure.Cosmos
@@ -38,14 +38,14 @@ type CoreBehaviors() =
[]
let ``serializes, achieving expected compression`` () =
- let encoded = eventCodec |> FsCodec.SystemTextJson.Encoding.EncodeTryCompress |> _.Encode((), A { embed = String('x',5000) })
+ let encoded = eventCodec |> FsCodec.SystemTextJson.Encoder.Compressed |> _.Encode((), A { embed = String('x',5000) })
let res = ser encoded
test <@ res.Contains "\"d\":\"" && res.Length < 138 && res.Contains "\"D\":2" @>
let codec compress =
- let forceCompression: FsCodec.SystemTextJson.CompressionOptions = { minSize = 0; minGain = -1000 }
- if compress then FsCodec.SystemTextJson.Encoding.EncodeTryCompress(eventCodec, options = forceCompression)
- else FsCodec.SystemTextJson.Encoding.EncodeUncompressed eventCodec
+ let forceCompression: FsCodec.CompressionOptions = { minSize = 0; minGain = -1000 }
+ if compress then FsCodec.SystemTextJson.Encoder.Compressed(eventCodec, options = forceCompression)
+ else FsCodec.SystemTextJson.Encoder.Uncompressed eventCodec
[]
let roundtrips compress value =
diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs
index 64d15e589..1a223b0eb 100644
--- a/tools/Equinox.Tool/Program.fs
+++ b/tools/Equinox.Tool/Program.fs
@@ -30,6 +30,8 @@ type Parameters =
| [] InitSql of ParseResults
| [] Stats of ParseResults
| [] Query of ParseResults
+ | [] Top of ParseResults
+ | [] Destroy of ParseResults
interface IArgParserTemplate with
member a.Usage = a |> function
| Quiet -> "Omit timestamps from log output"
@@ -44,6 +46,8 @@ type Parameters =
| InitSql _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)."
| Stats _ -> "inspect store to determine numbers of streams/documents/events and/or config (supports `cosmos` and `dynamo` stores)."
| Query _ -> "Load/Summarise streams based on Cosmos SQL Queries (supports `cosmos` only)."
+ | Top _ -> "Scan to determine top categories and streams (supports `cosmos` only)."
+ | Destroy _ -> "DELETE documents for a nominated category and/or stream (includes a dry-run mode). (supports `cosmos` only)."
and [] InitParameters =
| [] Rus of int
| [] Autoscale
@@ -101,8 +105,9 @@ and [] InitSqlParameters =
| Postgres _ -> "Configure Postgres Store."
and [] StatsParameters =
| [] Events
+ | [] Unfolds
| [] Streams
- | [] Documents
+ | [] Items
| [] Oldest
| [] Newest
| [] Parallel
@@ -111,8 +116,9 @@ and [] StatsParameters =
interface IArgParserTemplate with
member a.Usage = a |> function
| Events -> "Count the number of Events in the store."
+ | Unfolds -> "Count the number of Unfolds in the store."
| Streams -> "Count the number of Streams in the store."
- | Documents -> "Count the number of Documents in the store."
+ | Items -> "Count the number of Items(Documents) in the store."
| Oldest -> "Oldest document, based on the _ts field"
| Newest -> "Newest document, based on the _ts field"
| Parallel -> "Run in Parallel (CAREFUL! can overwhelm RU allocations)."
@@ -120,6 +126,7 @@ and [] StatsParameters =
| Dynamo _ -> "Dynamo Connection parameters."
and [] QueryParameters =
| [] StreamName of string
+ | [] StreamLike of string
| [] CategoryName of string
| [] CategoryLike of string
| [] UnfoldName of string
@@ -132,33 +139,44 @@ and [] QueryParameters =
interface IArgParserTemplate with
member a.Usage = a |> function
| StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
+ | StreamLike _ -> "Specify stream name to match against `p`, e.g. `%-f7c1ce63389a45bdbea1cccebb1b3c8a`."
| CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`)."
| UnfoldName _ -> "Specify unfold Name to match against `u.c`, e.g. `Snapshotted`"
| UnfoldCriteria _ -> "Specify constraints on Unfold (reference unfold fields via `u.d.`, top level fields via `c.`), e.g. `u.d.name = \"TenantName1\"`."
- | Mode _ -> "readOnly: Only read `u`nfolds, not `_etag`.\n" +
- "readWithStream: Read `u`nfolds and `p` (stream name), but not `_etag`.\n" +
- "default: Retrieve full data (p, u, _etag). <- Default for normal queries\n" +
+ | Mode _ -> "default: `_etag` plus snapwithStream (_etag, p, u[0].d). <- Default for normal queries\n" +
+ "snaponly: Only read `u[0].d`\n" +
+ "snapwithstream: Read `u[0].d` and `p` (stream name), but not `_etag`.\n" +
+ "readonly: Only read `u`nfolds, not `_etag`.\n" +
+ "readwithstream: Read `u`nfolds and `p` (stream name), but not `_etag`.\n" +
"raw: Read all Items(documents) in full. <- Default when Output File specified\n"
| File _ -> "Export full retrieved JSON to file. NOTE this switches the default mode to `Raw`"
| Pretty -> "Render the JSON indented over multiple lines"
| Console -> "Also emit the JSON to the console. Default: Gather statistics (but only write to a File if specified)"
| Cosmos _ -> "Parameters for CosmosDB."
-and [] Mode = ReadOnly | ReadWithStream | Default | Raw
-and [] Criteria = SingleStream of string | CatName of string | CatLike of string | Unfiltered
+and [] Mode = Default | SnapOnly | SnapWithStream | ReadOnly | ReadWithStream | Raw
+and [] Criteria =
+ | SingleStream of string | StreamLike of string | CatName of string | CatLike of string | Custom of sql: string | Unfiltered
+ member x.Sql = x |> function
+ | Criteria.SingleStream sn -> $"c.p = \"{sn}\""
+ | Criteria.StreamLike pat -> $"c.p LIKE \"{pat}\""
+ | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
+ | Criteria.CatLike pat -> $"c.p LIKE \"{pat}-%%\""
+ | Criteria.Custom filter -> filter
+ | Criteria.Unfiltered -> "1=1"
and QueryArguments(p: ParseResults) =
member val Mode = p.GetResult(QueryParameters.Mode, if p.Contains QueryParameters.File then Mode.Raw else Mode.Default)
member val Pretty = p.Contains QueryParameters.Pretty
member val TeeConsole = p.Contains QueryParameters.Console
member val Criteria =
- match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with
- | Some sn, None, None -> Criteria.SingleStream sn
- | Some _, Some _, _
- | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive"
- | None, Some cn, None -> Criteria.CatName cn
- | None, None, Some cl -> Criteria.CatLike cl
- | None, None, None -> Criteria.Unfiltered
- | None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
+ match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.StreamLike,
+ p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with
+ | Some sn, None, None, None -> Criteria.SingleStream sn
+ | None, Some sl, None, None -> Criteria.StreamLike sl
+ | None, None, Some cn, None -> Criteria.CatName cn
+ | None, None, None, Some cl -> Criteria.CatLike cl
+ | None, None, None, None -> Criteria.Unfiltered
+ | _ -> p.Raise "StreamName/StreamLike and CategoryLike/CategoryName are mutually exclusive"
member val Filepath = p.TryGetResult QueryParameters.File
member val UnfoldName = p.TryGetResult QueryParameters.UnfoldName
member val UnfoldCriteria = p.TryGetResult QueryParameters.UnfoldCriteria
@@ -166,17 +184,100 @@ and QueryArguments(p: ParseResults) =
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
| Store.Config.Cosmos (cc, _, _) -> cc.Container
| _ -> p.Raise "Query requires Cosmos"
- member x.ConfigureStore(log: ILogger) =
- let storeConfig = None, true
- Store.Cosmos.config log storeConfig x.CosmosArgs
+and [] TopParameters =
+ | [] StreamName of string
+ | [] CategoryName of string
+ | [] CategoryLike of string
+ | [] CustomFilter of sql: string
+ | [] Streams
+ | [] TsOrder
+ | [] CategoryLimit of int
+ | [] StreamsLimit of int
+ | [] Sort of Order
+ | [] Cosmos of ParseResults
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
+ | CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
+ | CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`)."
+ | CustomFilter _ -> "Specify a custom filter, referencing the document as `c.` (e.g. `'c.p LIKE \"test-%\" AND c._ts < 1717138092'`)"
+ | Streams -> "Stream level stats"
+ | TsOrder -> "Retrieve data in `_ts` ORDER (generally has significant RU impact). Default: Use continuation tokens"
+ | Sort _ -> "Sort order for results"
+ | CategoryLimit _ -> "Number of categories to limit output to. Default: unlimited."
+ | StreamsLimit _ -> "Number of streams to limit output to. Default: 50"
+ | Cosmos _ -> "Parameters for CosmosDB."
+and Order = Name | Items | Events | Unfolds | Size | EventSize | UnfoldSize | InflateSize | CorrCauseSize
+and TopArguments(p: ParseResults) =
+ member val Criteria =
+ match p.TryGetResult TopParameters.StreamName, p.TryGetResult TopParameters.CategoryName, p.TryGetResult TopParameters.CategoryLike, p.TryGetResult TopParameters.CustomFilter with
+ | None, None, None, None -> Criteria.Unfiltered
+ | Some sn, None, None, None -> Criteria.SingleStream sn
+ | None, Some cn, None, None -> Criteria.CatName cn
+ | None, None, Some cl, None -> Criteria.CatLike cl
+ | None, None, None, Some filter -> Criteria.Custom filter
+ | _ -> p.Raise "StreamName/CategoryLike/CategoryName/CustomFilter are mutually exclusive"
+ member val CosmosArgs = p.GetResult TopParameters.Cosmos |> Store.Cosmos.Arguments
+ member val StreamLevel = p.Contains Streams
+ member val CategoriesLimit = p.GetResult(CategoryLimit, Int32.MaxValue)
+ member val TsOrder = p.Contains TsOrder
+ member val Order = p.GetResult(Sort, Order.Size)
+ member val StreamsLimit = p.GetResult(StreamsLimit, 50)
+ member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
+ | Store.Config.Cosmos (cc, _, _) -> cc.Container
+ | _ -> p.Raise "Top requires Cosmos"
+ member x.Execute(sql) = let container = x.Connect()
+ let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
+ let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItemsOr 9999)
+ container.GetItemQueryIterator(qd, requestOptions = qo)
+and [] DestroyParameters =
+ | [] StreamName of string
+ | [] StreamLike of string
+ | [] CategoryName of string
+ | [] CategoryLike of string
+ | [] CustomFilter of sql: string
+ | [] Force
+ | [] Parallelism of dop: int
+ | [] Cosmos of ParseResults
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
+ | StreamLike _ -> "Specify stream name to match against `p`, e.g. `%-f7c1ce63389a45bdbea1cccebb1b3c8a`."
+ | CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
+ | CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`)."
+ | CustomFilter _ -> "Specify a custom filter, referencing the document as `c.` (e.g. `'c.p LIKE \"test-%\" AND c._ts < 1717138092'`)"
+ | Force -> "Actually delete the documents (default is a dry run, reporting what would be deleted)"
+ | Parallelism _ -> "Number of concurrent delete requests permitted to run in parallel. Default: 32"
+ | Cosmos _ -> "Parameters for CosmosDB."
+and DestroyArguments(p: ParseResults) =
+ member val Criteria =
+ match p.TryGetResult StreamName, p.TryGetResult DestroyParameters.StreamLike, p.TryGetResult CategoryName, p.TryGetResult CategoryLike, p.TryGetResult CustomFilter with
+ | Some sn, None, None, None, None -> Criteria.SingleStream sn
+ | None, Some sl, None, None, None -> Criteria.StreamLike sl
+ | None, None, Some cn, None, None -> Criteria.CatName cn
+ | None, None, None, Some cl, None -> Criteria.CatLike cl
+ | None, None, None, None, Some filter -> Criteria.Custom filter
+ | None, None, None, None, None -> p.Raise "Category or stream name/pattern, or custom SQL must be supplied"
+ | _ -> p.Raise "StreamName/SteamLike/CategoryLike/CategoryName/CustomFilter are mutually exclusive"
+ member val CosmosArgs = p.GetResult DestroyParameters.Cosmos |> Store.Cosmos.Arguments
+ member val DryRun = p.Contains Force |> not
+ member val Dop = p.GetResult(Parallelism, 32)
+ member val StatsInterval = TimeSpan.FromSeconds 30
+ member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
+ | Store.Config.Cosmos (cc, _, _) -> cc.Container
+ | _ -> p.Raise "Destroy requires Cosmos"
+and SnEventsUnfolds = { p: string; id: string; es: int; us: int }
and [] DumpParameters =
| [] Stream of FsCodec.StreamName
+ | [] StreamLike of string
| [] Correlation
| [] Blobs
| [] JsonSkip
| [] Pretty
| [] FlattenUnfolds
| [] TimeRegular
+ | [] Intervals
+ | [] Names
| [] UnfoldsOnly
| [] EventsOnly
| [] Cosmos of ParseResults
@@ -189,12 +290,15 @@ and [] DumpParameters =
interface IArgParserTemplate with
member a.Usage = a |> function
| Stream _ -> "Specify stream(s) to dump."
+ | StreamLike _ -> "(CosmosDB only) Specify stream name pattern to dump: LIKE expression with `%` and `_` tokens etc."
| Correlation -> "Include Correlation/Causation identifiers"
| Blobs -> "Don't assume Data/Metadata is UTF-8 text"
| JsonSkip -> "Don't assume Data/Metadata is JSON"
| Pretty -> "Pretty print the JSON over multiple lines"
- | FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds"
+ | FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds. Quiet mode: Pretty print"
| TimeRegular -> "Don't humanize time intervals between events"
+ | Intervals -> "Omit intervals between events. Quiet mode: Include intervals"
+ | Names -> "Emit StreamName prior to events/unfolds instead of adding log context. Quiet mode: exclude stream names"
| UnfoldsOnly -> "Exclude Events. Default: show both Events and Unfolds"
| EventsOnly -> "Exclude Unfolds/Snapshots. Default: show both Events and Unfolds."
| Es _ -> "Parameters for EventStore."
@@ -230,6 +334,19 @@ and DumpArguments(p: ParseResults) =
let storeLog = createStoreLog false
storeLog, Store.MessageDb.config log None p
| x -> p.Raise $"unexpected subcommand %A{x}"
+ member val CosmosArgs = p.GetResult DumpParameters.Cosmos |> Store.Cosmos.Arguments
+ member x.Connect() =
+ match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
+ | Store.Config.Cosmos (cc, _, _) -> cc.Container
+ | _ -> p.Raise "Dump StreamLike option requires Cosmos"
+ member x.Streams(infoLogLevel) =
+ let streams = p.GetResults DumpParameters.Stream
+ match p.TryGetResult DumpParameters.StreamLike with
+ | None -> streams
+ | Some pattern ->
+ let container = x.Connect()
+ let q = Microsoft.Azure.Cosmos.QueryDefinition($"SELECT DISTINCT VALUE c.p from c where c.p LIKE \"{pattern}\"")
+ Equinox.CosmosStore.Linq.Internal.Query.exec Log.Logger container infoLogLevel q |> FSharp.Control.TaskSeq.toList
let writeToStatsSinks (c : LoggerConfiguration) =
c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink())
.WriteTo.Sink(Equinox.DynamoStore.Core.Log.InternalMetrics.Stats.LogSink())
@@ -304,34 +421,37 @@ module CosmosInit =
module CosmosStats =
- type Microsoft.Azure.Cosmos.Container with // NB DO NOT CONSIDER PROMULGATING THIS HACK
- member container.QueryValue<'T>(sqlQuery : string) = task {
- let! (res: Microsoft.Azure.Cosmos.FeedResponse<'T>) = container.GetItemQueryIterator<'T>(sqlQuery).ReadNextAsync()
- return res |> Seq.exactlyOne }
+ open Equinox.CosmosStore.Linq.Internal
+ open FSharp.Control
+
let run (log : ILogger, _verboseConsole, _maybeSeq) (p : ParseResults) =
match p.GetSubCommand() with
| StatsParameters.Cosmos sp ->
- let doS, doD, doE, doO, doN =
- let s, d, e, o, n = p.Contains StatsParameters.Streams, p.Contains Documents, p.Contains StatsParameters.Events, p.Contains Oldest, p.Contains Newest
- let all = not (s || d || e || o || n)
- all || s, all || d, all || e, all || o, all || n
- let doS = doS || (not doD && not doE) // default to counting streams only unless otherwise specified
+ let doS, doI, doE, doU, doO, doN =
+ let s, i, e, u, o, n = p.Contains StatsParameters.Streams, p.Contains StatsParameters.Items, p.Contains StatsParameters.Events, p.Contains StatsParameters.Unfolds, p.Contains Oldest, p.Contains Newest
+ let all = not (s || i || e || u || o || n)
+ all || s, all || i, all || e, all || u, all || o, all || n
+ let doS = doS || (not doI && not doE) // default to counting streams only unless otherwise specified
let inParallel = p.Contains Parallel
let connector, dName, cName = CosmosInit.connect log sp
let container = connector.CreateUninitialized().GetContainer(dName, cName)
let ops = [| if doS then "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """
- if doD then "Documents", """SELECT VALUE COUNT(1) FROM c"""
+ if doI then "Items", """SELECT VALUE COUNT(1) FROM c"""
if doE then "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """
+ if doU then "Unfolded", """SELECT VALUE SUM(ARRAY_LENGTH(c.u) > 0 ? 1 : 0) FROM c WHERE c.id="-1" """
+ if doU then "Unfolds", """SELECT VALUE SUM(ARRAYLENGTH(c.u)) FROM c WHERE c.id="-1" """
if doO then "Oldest", """SELECT VALUE MIN(c._ts) FROM c"""
if doN then "Newest", """SELECT VALUE MAX(c._ts) FROM c""" |]
let render = if log.IsEnabled LogEventLevel.Debug then snd else fst
log.Information("Computing {measures} ({mode})", Seq.map render ops, (if inParallel then "in parallel" else "serially"))
ops |> Seq.map (fun (name, sql) -> async {
- log.Debug("Running query: {sql}", sql)
- let res = container.QueryValue(sql) |> Async.AwaitTaskCorrect |> Async.RunSynchronously
- match name with
- | "Oldest" | "Newest" -> log.Information("{stat,-10}: {result,13} ({d:u})", name, res, DateTime.UnixEpoch.AddSeconds(float res))
- | _ -> log.Information("{stat,-10}: {result,13:N0}", name, res) })
+ let! res = Microsoft.Azure.Cosmos.QueryDefinition sql
+ |> container.GetItemQueryIterator
+ |> Query.enum_ log container "Stat" null LogEventLevel.Debug |> TaskSeq.tryHead |> Async.AwaitTaskCorrect
+ match name, res with
+ | ("Oldest" | "Newest"), Some res -> log.Information("{stat,-10}: {result,13} ({d:u})", name, res, DateTime.UnixEpoch.AddSeconds(float res))
+ | _, Some res -> log.Information("{stat,-10}: {result,13:N0}", name, res)
+ | _, None -> () }) // handle no Oldest/Newest not producing a result
|> if inParallel then Async.Parallel else Async.Sequential
|> Async.Ignore
| StatsParameters.Dynamo sp -> async {
@@ -351,33 +471,37 @@ module CosmosStats =
let prettySerdes = lazy FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Options.Create(indent = true))
+type System.Text.Json.JsonElement with
+ member x.Timestamp = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
+ member x.TryProp(name: string) = let mutable p = Unchecked.defaultof<_> in if x.TryGetProperty(name, &p) then ValueSome p else ValueNone
+
+module StreamName =
+ let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
+
module CosmosQuery =
- let inline miB x = float x / 1024. / 1024.
- let private unixEpoch = DateTime.UnixEpoch
- type System.Text.Json.JsonElement with
- member x.Utf8ByteCount = if x.ValueKind = System.Text.Json.JsonValueKind.Null then 0 else x.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount
+ open Equinox.CosmosStore.Linq.Internal
+ open FSharp.Control
+ let inline miB x = Equinox.CosmosStore.Linq.Internal.miB x
type System.Text.Json.JsonDocument with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x.RootElement)
member x.Timestamp =
let ok, p = x.RootElement.TryGetProperty("_ts")
- if ok then p.GetDouble() |> unixEpoch.AddSeconds |> Some else None
+ if ok then p.GetDouble() |> DateTime.UnixEpoch.AddSeconds |> Some else None
let private composeSql (a: QueryArguments) =
- let inline warnOnUnfiltered () =
+ match a.Criteria with
+ | Criteria.Unfiltered ->
let lel = if a.Mode = Mode.Raw then LogEventLevel.Debug elif a.Filepath = None then LogEventLevel.Warning else LogEventLevel.Information
Log.Write(lel, "No StreamName or CategoryName/CategoryLike specified - Unfold Criteria better be unambiguous")
- let partitionKeyCriteria =
- match a.Criteria with
- | Criteria.SingleStream sn -> $"c.p = \"{sn}\""
- | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
- | Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
- | Criteria.Unfiltered -> warnOnUnfiltered (); "1=1"
+ | _ -> ()
let selectedFields =
match a.Mode with
- | Mode.ReadOnly -> "c.u"
- | Mode.ReadWithStream -> "c.p, c.u"
- | Mode.Default -> "c.p, c.u, c._etag"
- | Mode.Raw -> "*"
+ | Mode.Default -> "c._etag, c.p, c.u[0].D, c.u[0].d"
+ | Mode.SnapOnly -> "c.u[0].D, c.u[0].d"
+ | Mode.SnapWithStream -> "c.p, c.u[0].D, c.u[0].d"
+ | Mode.ReadOnly -> "c.u" // TOCONSIDER remove; adjust TryLoad/TryHydrateTip
+ | Mode.ReadWithStream -> "c.p, c.u" // TOCONSIDER remove; adjust TryLoad/TryHydrateTip
+ | Mode.Raw -> "*"
let unfoldFilter =
let exists cond = $"EXISTS (SELECT VALUE u FROM u IN c.u WHERE {cond})"
match [| match a.UnfoldName with None -> () | Some un -> $"u.c = \"{un}\""
@@ -385,52 +509,249 @@ module CosmosQuery =
| [||] -> "1=1"
| [| x |] -> x |> exists
| xs -> String.Join(" AND ", xs) |> exists
- $"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter} ORDER BY c.i"
- let private makeQuery (a: QueryArguments) =
+ $"SELECT {selectedFields} FROM c WHERE {a.Criteria.Sql} AND {unfoldFilter}"
+ let private queryDef (a: QueryArguments) =
let sql = composeSql a
Log.Information("Querying {mode}: {q}", a.Mode, sql)
- let storeConfig = a.ConfigureStore(Log.Logger)
- let container = match storeConfig with Store.Config.Cosmos (cc, _, _) -> cc.Container | _ -> failwith "Query requires Cosmos"
- let opts = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems)
- container.GetItemQueryIterator(sql, requestOptions = opts)
- let run (a: QueryArguments) = async {
- let sw, sw2 = System.Diagnostics.Stopwatch(), System.Diagnostics.Stopwatch.StartNew()
+ Microsoft.Azure.Cosmos.QueryDefinition sql
+ let run ill (a: QueryArguments) = task {
+ let sw = System.Diagnostics.Stopwatch.StartNew()
let serdes = if a.Pretty then prettySerdes.Value else FsCodec.SystemTextJson.Serdes.Default
let maybeFileStream = a.Filepath |> Option.map (fun p ->
Log.Information("Dumping {mode} content to {path}", a.Mode, System.IO.FileInfo(p).FullName)
System.IO.File.Create p) // Silently truncate if it exists, makes sense for typical usage
- use query = makeQuery a
-
+ let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems)
+ let container = a.Connect()
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
let mutable accI, accE, accU, accRus, accBytesRead = 0L, 0L, 0L, 0., 0L
- try while query.HasMoreResults do
- sw.Restart()
- let! page = query.ReadNextAsync(CancellationToken.None) |> Async.AwaitTaskCorrect
- let pageSize = page.Resource |> Seq.sumBy _.RootElement.Utf8ByteCount
- let newestAge = page.Resource |> Seq.choose _.Timestamp |> Seq.tryLast |> Option.map (fun ts -> ts - DateTime.UtcNow)
- let items = [| for x in page.Resource -> x.Cast() |]
+ let it = container.GetItemQueryIterator(queryDef a, requestOptions = qo)
+ try for rtt, rc, items, rdc, rds, ods in it |> Query.enum__ do
+ let mutable newestTs = DateTime.MinValue
+ let items = [| for x in items -> newestTs <- max newestTs x.RootElement.Timestamp
+ System.Text.Json.JsonSerializer.Deserialize(x.RootElement) |]
let inline arrayLen x = if isNull x then 0 else Array.length x
pageStreams.Clear(); for x in items do if x.p <> null && pageStreams.Add x.p then accStreams.Add x.p |> ignore
let pageI, pageE, pageU = items.Length, items |> Seq.sumBy (_.e >> arrayLen), items |> Seq.sumBy (_.u >> arrayLen)
- Log.Information("Page {count}i {streams}s {es}e {us}u {ru}RU {s:N1}s {mib:N1}MiB age {age:dddd\.hh\:mm\:ss}",
- pageI, pageStreams.Count, pageE, pageU, page.RequestCharge, sw.Elapsed.TotalSeconds, miB pageSize, Option.toNullable newestAge)
-
+ Log.Write(ill, "Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}",
+ rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, DateTime.UtcNow - newestTs)
maybeFileStream |> Option.iter (fun stream ->
- for x in page.Resource do
+ for x in items do
serdes.SerializeToStream(x, stream)
stream.WriteByte(byte '\n'))
if a.TeeConsole then
- page.Resource |> Seq.iter (serdes.Serialize >> Console.WriteLine)
-
+ items |> Seq.iter (serdes.Serialize >> Console.WriteLine)
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
- accRus <- accRus + page.RequestCharge; accBytesRead <- accBytesRead + int64 pageSize
+ accRus <- accRus + rc; accBytesRead <- accBytesRead + int64 ods
finally
let fileSize = maybeFileStream |> Option.map _.Position |> Option.defaultValue 0
maybeFileStream |> Option.iter _.Close() // Before we log so time includes flush time and no confusion
- let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
- let accCategories = accStreams |> Seq.map categoryName |> Seq.distinct |> Seq.length
- Log.Information("TOTALS {cats}c {streams:N0}s {count:N0}i {es:N0}e {us:N0}u {ru:N2}RU R/W {rmib:N1}/{wmib:N1}MiB {s:N1}s",
- accCategories, accStreams.Count, accI, accE, accU, accRus, miB accBytesRead, miB fileSize, sw2.Elapsed.TotalSeconds) }
+ let accCategories = System.Collections.Generic.HashSet(accStreams |> Seq.map StreamName.categoryName).Count
+ Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u R/W {rmib:N1}/{wmib:N1}MiB {ru:N2}RU {s:N1}s",
+ accI, accCategories, accStreams.Count, accE, accU, miB accBytesRead, miB fileSize, accRus, sw.Elapsed.TotalSeconds) }
+
+module CosmosTop =
+
+ open Equinox.CosmosStore.Linq.Internal
+ open FSharp.Control
+ open System.Text.Json
+
+ let _t = Unchecked.defaultof
+ let inline tryEquinoxStreamName (x: JsonElement) =
+ match x.TryProp(nameof _t.p) with
+ | ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String ->
+ je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
+ | _ -> ValueNone
+ let inline parseEquinoxStreamName (x: JsonElement) =
+ match tryEquinoxStreamName x with
+ | ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
+ | ValueSome sn -> sn
+
+ module private Parser =
+ let scratch = new System.IO.MemoryStream()
+ let utf8Size (x: JsonElement) =
+ scratch.Position <- 0L
+ JsonSerializer.Serialize(scratch, x)
+ scratch.Position
+ let inflatedUtf8Size x =
+ scratch.Position <- 0L
+ FsCodec.SystemTextJson.Encoding.ToStream(scratch, x)
+ scratch.Position
+ let infSize dataField formatField (x: JsonElement) =
+ match x.TryProp dataField, x.TryProp formatField with
+ | ValueNone, _ -> 0L
+ | ValueSome d, df -> inflatedUtf8Size (df |> ValueOption.map _.GetInt32() |> ValueOption.defaultValue 0, x)
+ // using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about
+ let inline stringLen x = match x with ValueSome (x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString().Length | _ -> 0
+ let _e = Unchecked.defaultof // Or Unfold - both share field names
+ let dmcSize (x: JsonElement) =
+ (struct (0, 0L), x.EnumerateArray())
+ ||> Seq.fold (fun struct (c, i) x ->
+ struct (c + (x.TryProp(nameof _e.correlationId) |> stringLen) + (x.TryProp(nameof _e.causationId) |> stringLen),
+ i + infSize "d" "D" x + infSize "m" "M" x))
+ let private tryParseEventOrUnfold = function
+ | ValueNone -> struct (0, 0L, struct (0, 0L))
+ | ValueSome (x: JsonElement) -> x.GetArrayLength(), utf8Size x, dmcSize x
+ let _t = Unchecked.defaultof
+ []
+ type Stat =
+ { key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64; cBytes: int64; iBytes: int64 }
+ member x.Merge y =
+ { key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds; bytes = x.bytes + y.bytes
+ eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes; cBytes = x.cBytes + y.cBytes; iBytes = x.iBytes + y.iBytes }
+ override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
+ override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
+ static member Create(key, x: JsonElement) =
+ let struct (e, eb, struct (ec, ei)) = x.TryProp(nameof _t.e) |> tryParseEventOrUnfold
+ let struct (u, ub, struct (uc, ui)) = x.TryProp(nameof _t.u) |> tryParseEventOrUnfold
+ { key = key; count = 1; events = e; unfolds = u
+ bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui }
+ let [] OrderByTs = " ORDER BY c._ts"
+ let private sql (a: TopArguments) = $"SELECT * FROM c WHERE {a.Criteria.Sql}{if a.TsOrder then OrderByTs else null}"
+ let run ill (a: TopArguments) = task {
+ let sw = System.Diagnostics.Stopwatch.StartNew()
+ let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
+ let mutable accI, accE, accU, accRus, accRds, accOds, accBytes, accParse = 0L, 0L, 0L, 0., 0L, 0L, 0L, TimeSpan.Zero
+ let s = System.Collections.Generic.HashSet()
+ let group = if a.StreamLevel then id else StreamName.categoryName
+ try for rtt, rc, items, rdc, rds, ods in a.Execute(sql a) |> Query.enum__ do
+ let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew()
+ for x in items do
+ newestTs <- max newestTs x.Timestamp
+ let sn = parseEquinoxStreamName x
+ if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
+ let x = Parser.Stat.Create(group sn, x)
+ let mutable v = Unchecked.defaultof<_>
+ s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore
+ pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds
+ pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
+ Log.Write(ill, "Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}",
+ rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs)
+ pageStreams.Clear()
+ accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
+ accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB
+ accParse <- accParse + sw.Elapsed
+ finally
+
+ let accC = (if a.StreamLevel then s |> Seq.map _.key else accStreams) |> Seq.map StreamName.categoryName |> Seq.distinct |> Seq.length
+ let accS = if a.StreamLevel then s.Count else accStreams.Count
+ let iBytes, cBytes = s |> Seq.sumBy _.iBytes, s |> Seq.sumBy _.cBytes
+ let giB x = miB x / 1024.
+ Log.Information("TOTALS {cats:N0}c {streams:N0}s {count:N0}i {es:N0}e {us:N0}u Server read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB Parse {ps:N3}s Total {ru:N2}RU {s:N1}s",
+ accC, accS, accI, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accParse.TotalSeconds, accRus, sw.Elapsed.TotalSeconds)
+
+ let sort: seq -> seq<_> =
+ match a.Order with
+ | Order.Name -> Seq.sortBy (snd >> _.key)
+ | Order.Size -> Seq.sortByDescending (snd >> _.bytes)
+ | Order.Items -> Seq.sortByDescending (snd >> _.count)
+ | Order.Events -> Seq.sortByDescending (snd >> _.events)
+ | Order.Unfolds -> Seq.sortByDescending (snd >> _.unfolds)
+ | Order.EventSize -> Seq.sortByDescending (snd >> _.eBytes)
+ | Order.UnfoldSize -> Seq.sortByDescending (snd >> _.uBytes)
+ | Order.InflateSize -> Seq.sortByDescending (snd >> _.iBytes)
+ | Order.CorrCauseSize -> Seq.sortByDescending (snd >> _.cBytes)
+ let streamTemplate = "{count,8}i {tm,7:N2}MiB E{events,8} {em,7:N1} U{unfolds,8} {um,7:N1} D+M{dm,7:N1} C+C{cm,6:N1} {key}"
+ let catTemplate = "S{streams,8} " + streamTemplate
+ let render (streams, x: Parser.Stat) =
+ if streams = 0 then Log.Information(streamTemplate, x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key)
+ else Log.Information(catTemplate, streams, x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key)
+ if a.StreamLevel then
+ s |> Seq.groupBy (_.key >> StreamName.categoryName)
+ |> Seq.map (fun (cat, streams) -> Seq.length streams, { (streams |> Seq.reduce _.Merge) with key = cat })
+ |> sort |> Seq.truncate a.CategoriesLimit
+ |> Seq.iter render
+ s |> Seq.map (fun x -> 0, x) |> sort |> Seq.truncate (if a.StreamLevel then a.StreamsLimit else a.CategoriesLimit) |> Seq.iter render }
+
+module CosmosDestroy =
+
+ open Equinox.CosmosStore.Linq.Internal
+ open FSharp.Control
+
+ type Sem(max) =
+ let inner = new SemaphoreSlim(max)
+ member _.IsEmpty = inner.CurrentCount = max
+ member _.TryWait(ms: int) = inner.WaitAsync ms
+ member _.Release() = inner.Release() |> ignore
+
+ module Channel =
+
+ open System.Threading.Channels
+ let unboundedSr<'t> = Channel.CreateUnbounded<'t>(UnboundedChannelOptions(SingleReader = true))
+ let write (w: ChannelWriter<_>) = w.TryWrite >> ignore
+ let inline readAll (r: ChannelReader<_>) () = seq {
+ let mutable msg = Unchecked.defaultof<_>
+ while r.TryRead(&msg) do
+ yield msg }
+
+ let run (a: DestroyArguments) = task {
+ let tsw = System.Diagnostics.Stopwatch.StartNew()
+ let sql = $"SELECT c.p, c.id, ARRAYLENGTH(c.e) AS es, ARRAYLENGTH(c.u) AS us FROM c WHERE {a.Criteria.Sql}"
+ if a.DryRun then Log.Warning("Dry-run of deleting items based on {sql}", sql)
+ else Log.Warning("DESTROYING all Items WHERE {sql}", a.Criteria.Sql)
+ let container = a.Connect()
+ let query =
+ let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
+ let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItemsOr 9999)
+ container.GetItemQueryIterator(qd, requestOptions = qo)
+ let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
+ let mutable accI, accE, accU, accRus, accDelRu, accRds, accOds = 0L, 0L, 0L, 0., 0., 0L, 0L
+ let deletionDop = Sem a.Dop
+ let writeResult, readResults = let c = Channel.unboundedSr in Channel.write c.Writer, Channel.readAll c.Reader
+ try for rtt, rc, items, rdc, rds, ods in query |> Query.enum__ do
+ let mutable pageI, pageE, pageU, pRu, iRu = 0, 0, 0, 0., 0.
+ let pageSw, intervalSw = System.Diagnostics.Stopwatch.StartNew(), System.Diagnostics.Stopwatch.StartNew()
+ let drainResults () =
+ let mutable failMessage = null
+ for ru, exn in readResults () do
+ iRu <- iRu + ru; pRu <- pRu + ru
+ if exn <> null && failMessage <> null then failMessage <- exn
+ if intervalSw.Elapsed > a.StatsInterval then
+ Log.Information(".. Deleted {count,5}i {streams,7}s{es,7}e{us,7}u {rus,7:N2}WRU/s {s,6:N1}s",
+ pageI, pageStreams.Count, pageE, pageU, iRu / intervalSw.Elapsed.TotalSeconds, pageSw.Elapsed.TotalSeconds)
+ intervalSw.Restart()
+ iRu <- 0
+ if failMessage <> null then failwith failMessage
+ (a.StatsInterval - intervalSw.Elapsed).TotalMilliseconds |> int
+ let awaitState check = task {
+ let mutable reserved = false
+ while not reserved do
+ match drainResults () with
+ | wait when wait <= 0 -> ()
+ | timeoutAtNextLogInterval ->
+ match! check timeoutAtNextLogInterval with
+ | false -> ()
+ | true -> reserved <- true }
+ let checkEmpty () = task {
+ if deletionDop.IsEmpty then return true else
+ do! System.Threading.Tasks.Task.Delay 1
+ return deletionDop.IsEmpty }
+ let awaitCapacity () = awaitState deletionDop.TryWait
+ let releaseCapacity () = deletionDop.Release()
+ let awaitCompletion () = awaitState (fun _timeout -> checkEmpty ())
+ for i in items do
+ if pageStreams.Add i.p then accStreams.Add i.p |> ignore
+ pageI <- pageI + 1; pageE <- pageE + i.es; pageU <- pageU + i.us
+ if not a.DryRun then
+ do! awaitCapacity ()
+ ignore <| task { // we could do a Task.Run dance, but kicking it off inline without waiting suits us fine as results processed above
+ let! res = container.DeleteItemStreamAsync(i.id, Microsoft.Azure.Cosmos.PartitionKey i.p)
+ releaseCapacity ()
+ let exn =
+ if res.IsSuccessStatusCode || res.StatusCode = System.Net.HttpStatusCode.NotFound then null
+ else $"Deletion of {i.p}/{i.id} failed with Code: {res.StatusCode} Message: {res.ErrorMessage}\n{res.Diagnostics}"
+ writeResult (res.Headers.RequestCharge, exn) }
+ do! awaitCompletion () // we want stats output and/or failure exceptions to align with Pages
+ let ps = pageSw.Elapsed.TotalSeconds
+ Log.Information("Page{rdc,6}>{count,5}i {streams,7}s{es,7}e{us,7}u{rds,8:f2}>{ods,4:f2} {prc,8:f2}RRU {rs,5:N1}s {rus:N2}WRU/s {ps,5:N1}s",
+ rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, pRu / ps, ps)
+ pageStreams.Clear()
+ accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
+ accRus <- accRus + rc; accDelRu <- accDelRu + pRu; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods
+ finally
+
+ let accCats = accStreams |> Seq.map StreamName.categoryName |> System.Collections.Generic.HashSet |> _.Count
+ Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {rru:N2}RRU Avg {aru:N2}WRU/s Delete {dru:N2}WRU Total {s:N1}s",
+ accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, accDelRu / tsw.Elapsed.TotalSeconds, accDelRu, tsw.Elapsed.TotalSeconds) }
module DynamoInit =
@@ -474,12 +795,14 @@ module Dump =
let private prettifyJson (json: string) =
use parsed = System.Text.Json.JsonDocument.Parse json
prettySerdes.Value.Serialize parsed
- let run (log : ILogger, verboseConsole, maybeSeq) (p : ParseResults) = async {
+ let run ill (log : ILogger, verboseConsole, maybeSeq) (p : ParseResults) = async {
let a = DumpArguments p
let createStoreLog storeVerbose = createStoreLog storeVerbose verboseConsole maybeSeq
let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog)
let doU, doE = not (p.Contains EventsOnly), not (p.Contains UnfoldsOnly)
- let doC, doJ, doS, doT = p.Contains Correlation, not (p.Contains JsonSkip), not (p.Contains Blobs), not (p.Contains TimeRegular)
+ let quietMode = ill <> LogEventLevel.Debug
+ let doN = p.Contains Names = quietMode
+ let doI, doC, doJ, doS, doT = p.Contains Intervals <> quietMode, p.Contains Correlation, not (p.Contains JsonSkip), not (p.Contains Blobs), not (p.Contains TimeRegular)
let store = Services.Store(storeConfig)
let initial = List.empty
@@ -488,7 +811,7 @@ module Dump =
let idCodec = FsCodec.Codec.Create((fun _ -> failwith "No encoding required"), tryDecode, (fun _ _ -> failwith "No mapCausation"))
let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold), fun _state -> failwith "no snapshot required"
let formatUnfolds, formatEvents =
- if p.Contains FlattenUnfolds then id else prettifyJson
+ if p.Contains FlattenUnfolds = quietMode then id else prettifyJson
, if p.Contains Pretty then prettifyJson else id
let mutable payloadBytes = 0
let render format (data: ReadOnlyMemory) =
@@ -506,6 +829,8 @@ module Dump =
| x when x.TotalMinutes >= 1. -> x.ToString "m\mss\.ff\s"
| x -> x.ToString("s\.fff\s")
let dumpEvents (streamName: FsCodec.StreamName) = async {
+ let log = if doN then Log.Information("Dumping {sn}", streamName); log
+ else log.ForContext("sn", streamName)
let struct (categoryName, sid) = FsCodec.StreamName.split streamName
let cat = store.Category(categoryName, idCodec, fold, initial, isOriginAndSnapshot)
let decider = Equinox.Decider.forStream storeLog cat sid
@@ -518,20 +843,25 @@ module Dump =
| Some p when not x.IsUnfold -> let ts = x.Timestamp - p in if doT then humanize ts else ts.ToString()
| _ -> if doT then "n/a" else "0"
prevTs <- Some x.Timestamp
- if not doC then log.Information("{i,4}@{t:u}+{d,9} {u:l} {e:l} {data:l} {meta:l}",
- x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta)
- else log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}",
- x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta)
- match streamBytes with ValueNone -> () | ValueSome x -> log.Information("ISyncContext.StreamEventBytes {kib:n1}KiB", float x / 1024.) }
+ if doC then
+ log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}",
+ x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta)
+ elif doI then
+ log.Information("{i,4}@{t:u}+{d,9:u} {u:l} {e:l} {data:l} {meta:l}",
+ x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta)
+ else
+ log.Information("{i,4}@{t:u} {u:l} {e:l} {data:l} {meta:l}",
+ x.Index, x.Timestamp, ty, x.EventType, render x.Data, render x.Meta)
+ match streamBytes with ValueNone -> () | ValueSome x -> log.Write(ill, "ISyncContext.StreamEventBytes {kib:n1}KiB", float x / 1024.) }
resetStats ()
- let streams = p.GetResults DumpParameters.Stream
- log.ForContext("streams",streams).Information("Reading...")
+ let streams = a.Streams(ill)
+ log.ForContext("streams",streams).Write(ill, "Reading...")
do! streams
|> Seq.map dumpEvents
- |> Async.Parallel
+ |> Async.Sequential
|> Async.Ignore
- log.Information("Total Event Bodies Payload {kib:n1}KiB", float payloadBytes / 1024.)
+ log.Write(ill, "Total Event Bodies Payload {kib:n1}KiB", float payloadBytes / 1024.)
if verboseConsole then
dumpStats log storeConfig }
@@ -540,12 +870,15 @@ type Arguments(p: ParseResults) =
let quiet, verbose, verboseConsole = p.Contains Quiet, p.Contains Verbose, p.Contains VerboseConsole
member _.CreateDomainLog() = createDomainLog quiet verbose verboseConsole maybeSeq
member _.ExecuteSubCommand() = async {
+ let ill = if quiet then LogEventLevel.Debug else LogEventLevel.Information
match p.GetSubCommand() with
| Init a -> do! CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None |> Async.AwaitTaskCorrect
| InitAws a -> do! DynamoInit.table Log.Logger a
| InitSql a -> do! SqlInit.databaseOrSchema Log.Logger a
- | Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a
- | Query a -> do! CosmosQuery.run (QueryArguments a)
+ | Dump a -> do! Dump.run ill (Log.Logger, verboseConsole, maybeSeq) a
+ | Query a -> do! CosmosQuery.run ill (QueryArguments a) |> Async.AwaitTaskCorrect
+ | Top a -> do! CosmosTop.run ill (TopArguments a) |> Async.AwaitTaskCorrect
+ | Destroy a -> do! CosmosDestroy.run (DestroyArguments a) |> Async.AwaitTaskCorrect
| Stats a -> do! CosmosStats.run (Log.Logger, verboseConsole, maybeSeq) a
| LoadTest a -> let n = p.GetResult(LogFile, fun () -> p.ProgramName + ".log")
let reportFilename = System.IO.FileInfo(n).FullName