From 5f14a8f5960b090afe5431ae8c4c12a2419eb3ae Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 16 Oct 2025 12:41:02 +0200 Subject: [PATCH 1/4] Rewrite checksum pre-calculation to ignore small buckets. --- .../storage/implementation/MongoCompactor.ts | 167 +++++++----------- .../implementation/MongoSyncBucketStorage.ts | 6 +- .../src/storage/implementation/db.ts | 6 +- .../src/entry/commands/compact-action.ts | 2 +- 4 files changed, 76 insertions(+), 105 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index b1f2deb41..c04981f5a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -10,6 +10,7 @@ import { cacheKey } from './OperationBatch.js'; interface CurrentBucketState { /** Bucket name */ bucket: string; + /** * Rows seen in the bucket, with the last op_id of each. */ @@ -96,7 +97,7 @@ export class MongoCompactor { // We can make this more efficient later on by iterating // through the buckets in a single query. // That makes batching more tricky, so we leave for later. - await this.compactInternal(bucket); + await this.compactSingleBucket(bucket); } } else { await this.compactDirtyBuckets(); @@ -104,67 +105,48 @@ export class MongoCompactor { } private async compactDirtyBuckets() { - for await (let buckets of this.iterateDirtyBuckets()) { + while (!this.signal?.aborted) { + // Process all buckets with 1 or more changes since last time + const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 }); + if (buckets.length == 0) { + // All done + break; + } for (let bucket of buckets) { - await this.compactInternal(bucket); + await this.compactSingleBucket(bucket); } } } - private async compactInternal(bucket: string | undefined) { + private async compactSingleBucket(bucket: string) { const idLimitBytes = this.idLimitBytes; - let currentState: CurrentBucketState | null = null; - - let bucketLower: string | mongo.MinKey; - let bucketUpper: string | mongo.MaxKey; + let currentState: CurrentBucketState = { + bucket, + seen: new Map(), + trackingSize: 0, + lastNotPut: null, + opsSincePut: 0, - if (bucket == null) { - bucketLower = new mongo.MinKey(); - bucketUpper = new mongo.MaxKey(); - } else if (bucket.includes('[')) { - // Exact bucket name - bucketLower = bucket; - bucketUpper = bucket; - } else { - // Bucket definition name - bucketLower = `${bucket}[`; - bucketUpper = `${bucket}[\uFFFF`; - } + checksum: 0, + opCount: 0, + opBytes: 0 + }; // Constant lower bound const lowerBound: BucketDataKey = { g: this.group_id, - b: bucketLower as string, + b: bucket, o: new mongo.MinKey() as any }; // Upper bound is adjusted for each batch let upperBound: BucketDataKey = { g: this.group_id, - b: bucketUpper as string, + b: bucket, o: new mongo.MaxKey() as any }; - const doneWithBucket = async () => { - if (currentState == null) { - return; - } - // Free memory before clearing bucket - currentState.seen.clear(); - if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - // Need flush() before clear() - await this.flush(); - await this.clearBucket(currentState); - } - - // Do this _after_ clearBucket so that we have accurate counts. - this.updateBucketChecksums(currentState); - }; - while (!this.signal?.aborted) { // Query one batch at a time, to avoid cursor timeouts const cursor = this.db.bucket_data.aggregate( @@ -211,22 +193,6 @@ export class MongoCompactor { upperBound = batch[batch.length - 1]._id; for (let doc of batch) { - if (currentState == null || doc._id.b != currentState.bucket) { - await doneWithBucket(); - - currentState = { - bucket: doc._id.b, - seen: new Map(), - trackingSize: 0, - lastNotPut: null, - opsSincePut: 0, - - checksum: 0, - opCount: 0, - opBytes: 0 - }; - } - if (doc._id.o > this.maxOpId) { continue; } @@ -297,12 +263,22 @@ export class MongoCompactor { } } - if (currentState != null) { - logger.info(`Processed batch of length ${batch.length} current bucket: ${currentState.bucket}`); - } + logger.info(`Processed batch of length ${batch.length} current bucket: ${bucket}`); } - await doneWithBucket(); + // Free memory before clearing bucket + currentState.seen.clear(); + if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { + logger.info( + `Inserting CLEAR at ${this.group_id}:${bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` + ); + // Need flush() before clear() + await this.flush(); + await this.clearBucket(currentState); + } + + // Do this _after_ clearBucket so that we have accurate counts. + this.updateBucketChecksums(currentState); // Need another flush after updateBucketChecksums() await this.flush(); @@ -490,8 +466,13 @@ export class MongoCompactor { /** * Subset of compact, only populating checksums where relevant. */ - async populateChecksums() { - for await (let buckets of this.iterateDirtyBuckets()) { + async populateChecksums(options: { minBucketChanges: number }) { + while (!this.signal?.aborted) { + const buckets = await this.dirtyBucketBatch(options); + if (buckets.length == 0) { + // All done + break; + } const start = Date.now(); logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`); @@ -500,51 +481,37 @@ export class MongoCompactor { } } - private async *iterateDirtyBuckets(): AsyncGenerator { - // This is updated after each batch - let lowerBound: BucketStateDocument['_id'] = { - g: this.group_id, - b: new mongo.MinKey() as any - }; - // This is static - const upperBound: BucketStateDocument['_id'] = { - g: this.group_id, - b: new mongo.MaxKey() as any - }; - while (!this.signal?.aborted) { - // By filtering buckets, we effectively make this "resumeable". - const filter: mongo.Filter = { - _id: { - $gt: lowerBound, - $lt: upperBound + /** + * Returns a batch of dirty buckets - buckets with most changes first. + * + * This cannot be used to iterate on its own - the client is expected to process these buckets and + * set estimate_since_compact.count: 0 when done, before fetching the next batch. + */ + private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise { + if (options.minBucketChanges <= 0) { + throw new ReplicationAssertionError('minBucketChanges must be >= 1'); + } + // We make use of an index on {_id.g: 1, 'estimate_since_compact.count': -1} + const dirtyBuckets = await this.db.bucket_state + .find( + { + '_id.g': this.group_id, + 'estimate_since_compact.count': { $gte: 1 } }, - // Partial index exists on this - 'estimate_since_compact.count': { $gt: 0 } - }; - - const dirtyBuckets = await this.db.bucket_state - .find(filter, { + { projection: { _id: 1 }, sort: { - _id: 1 + 'estimate_since_compact.count': -1 }, limit: 5_000, - maxTimeMS: MONGO_OPERATION_TIMEOUT_MS, - // Make sure we use the partial index - hint: 'dirty_buckets' - }) - .toArray(); - - if (dirtyBuckets.length == 0) { - break; - } - - yield dirtyBuckets.map((bucket) => bucket._id.b); + maxTimeMS: MONGO_OPERATION_TIMEOUT_MS + } + ) + .toArray(); - lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id; - } + return dirtyBuckets.map((bucket) => bucket._id.b); } private async updateChecksumsBatch(buckets: string[]) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 939abfde1..9924d5712 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -676,7 +676,11 @@ export class MongoSyncBucketStorage memoryLimitMB: 0 }); - await compactor.populateChecksums(); + await compactor.populateChecksums({ + // There are cases with millions of small buckets, in which case it can take very long to + // populate the checksums, with minimal benefit. We skip the small buckets here. + minBucketChanges: 10 + }); const duration = Date.now() - start; logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 326d138b5..37a35ef7c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -148,10 +148,10 @@ export class PowerSyncMongo { // TODO: Implement a better mechanism to use migrations in tests await this.bucket_state.createIndex( { - _id: 1, - 'estimate_since_compact.count': 1 + '_id.g': 1, + 'estimate_since_compact.count': -1 }, - { name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } } + { name: 'dirty_count' } ); } } diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index ce1a67781..014d12af8 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -25,7 +25,7 @@ const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024); export function registerCompactAction(program: Command) { const compactCommand = program .command(COMMAND_NAME) - .option(`-b, --buckets [buckets]`, 'Bucket or bucket definition name (optional, comma-separate multiple names)'); + .option(`-b, --buckets [buckets]`, 'Bucket name (optional, comma-separate multiple names)'); wrapConfigCommand(compactCommand); From 2b94dd9218508b0bdbe08e931cc53b498b5804ca Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 16 Oct 2025 12:41:56 +0200 Subject: [PATCH 2/4] Update changeset. --- .changeset/seven-mangos-sleep.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/seven-mangos-sleep.md b/.changeset/seven-mangos-sleep.md index e15a4bc1d..c5bcb926d 100644 --- a/.changeset/seven-mangos-sleep.md +++ b/.changeset/seven-mangos-sleep.md @@ -4,4 +4,4 @@ '@powersync/service-image': patch --- -[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting. +[MongoDB Storage] Only compact modified buckets. Add index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting, and skip small buckets. From 04716ba692426ab4a42aee02699716066011fc8e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 16 Oct 2025 12:54:47 +0200 Subject: [PATCH 3/4] Fix count query. --- .../src/storage/implementation/MongoCompactor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index c04981f5a..35cd00730 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -496,7 +496,7 @@ export class MongoCompactor { .find( { '_id.g': this.group_id, - 'estimate_since_compact.count': { $gte: 1 } + 'estimate_since_compact.count': { $gte: options.minBucketChanges } }, { projection: { From 658bff4f2e22195820f2bce8a62967d18a70a262 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 16 Oct 2025 15:49:13 +0200 Subject: [PATCH 4/4] Confirm in tests that small buckets are skipped. --- .../storage/implementation/MongoCompactor.ts | 14 ++++++++++++-- .../implementation/MongoSyncBucketStorage.ts | 9 ++++++--- .../test/src/storage_compacting.test.ts | 19 +++++++++++++++++-- .../src/storage/PostgresSyncRulesStorage.ts | 5 ++++- .../src/storage/SyncRulesBucketStorage.ts | 15 ++++++++++++++- 5 files changed, 53 insertions(+), 9 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 35cd00730..f3606c83a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -1,6 +1,13 @@ import { mongo, MONGO_OPERATION_TIMEOUT_MS } from '@powersync/lib-service-mongodb'; import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework'; -import { addChecksums, InternalOpId, isPartialChecksum, storage, utils } from '@powersync/service-core'; +import { + addChecksums, + InternalOpId, + isPartialChecksum, + PopulateChecksumCacheResults, + storage, + utils +} from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js'; @@ -466,7 +473,8 @@ export class MongoCompactor { /** * Subset of compact, only populating checksums where relevant. */ - async populateChecksums(options: { minBucketChanges: number }) { + async populateChecksums(options: { minBucketChanges: number }): Promise { + let count = 0; while (!this.signal?.aborted) { const buckets = await this.dirtyBucketBatch(options); if (buckets.length == 0) { @@ -478,7 +486,9 @@ export class MongoCompactor { await this.updateChecksumsBatch(buckets); logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`); + count += buckets.length; } + return { buckets: count }; } /** diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 9924d5712..ee5a4d4c5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -16,6 +16,8 @@ import { InternalOpId, internalToExternalOpId, maxLsn, + PopulateChecksumCacheOptions, + PopulateChecksumCacheResults, ProtocolOpId, ReplicationCheckpoint, storage, @@ -665,7 +667,7 @@ export class MongoSyncBucketStorage } } - async populatePersistentChecksumCache(options: Required>): Promise { + async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise { logger.info(`Populating persistent checksum cache...`); const start = Date.now(); // We do a minimal compact here. @@ -676,13 +678,14 @@ export class MongoSyncBucketStorage memoryLimitMB: 0 }); - await compactor.populateChecksums({ + const result = await compactor.populateChecksums({ // There are cases with millions of small buckets, in which case it can take very long to // populate the checksums, with minimal benefit. We skip the small buckets here. - minBucketChanges: 10 + minBucketChanges: options.minBucketChanges ?? 10 }); const duration = Date.now() - start; logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`); + return result; } /** diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index 8962d0cc2..40ac9f0ec 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -97,10 +97,25 @@ bucket_definitions: await populate(bucketStorage); const { checkpoint } = await bucketStorage.getCheckpoint(); - await bucketStorage.populatePersistentChecksumCache({ + // Default is to small small numbers - should be a no-op + const result0 = await bucketStorage.populatePersistentChecksumCache({ + maxOpId: checkpoint + }); + expect(result0.buckets).toEqual(0); + + // This should cache the checksums for the two buckets + const result1 = await bucketStorage.populatePersistentChecksumCache({ + maxOpId: checkpoint, + minBucketChanges: 1 + }); + expect(result1.buckets).toEqual(2); + + // This should be a no-op, as the checksums are already cached + const result2 = await bucketStorage.populatePersistentChecksumCache({ maxOpId: checkpoint, - signal: new AbortController().signal + minBucketChanges: 1 }); + expect(result2.buckets).toEqual(0); const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']); expect(checksumAfter.get('by_user2["u1"]')).toEqual({ diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 88f36b145..23498e2fc 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -11,6 +11,8 @@ import { LastValueSink, maxLsn, PartialChecksum, + PopulateChecksumCacheOptions, + PopulateChecksumCacheResults, ReplicationCheckpoint, storage, utils, @@ -112,8 +114,9 @@ export class PostgresSyncRulesStorage return new PostgresCompactor(this.db, this.group_id, options).compact(); } - async populatePersistentChecksumCache(options: Pick): Promise { + async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise { // no-op - checksum cache is not implemented for Postgres yet + return { buckets: 0 }; } lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index b6f9adff2..991166c60 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -65,7 +65,7 @@ export interface SyncRulesBucketStorage /** * Lightweight "compact" process to populate the checksum cache, if any. */ - populatePersistentChecksumCache(options?: Pick): Promise; + populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise; // ## Read operations @@ -225,6 +225,19 @@ export interface CompactOptions { signal?: AbortSignal; } +export interface PopulateChecksumCacheOptions { + maxOpId: util.InternalOpId; + minBucketChanges?: number; + signal?: AbortSignal; +} + +export interface PopulateChecksumCacheResults { + /** + * Number of buckets we have calculated checksums for. + */ + buckets: number; +} + export interface ClearStorageOptions { signal?: AbortSignal; }