diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index b8f3359..a9165c9 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -228,7 +228,7 @@ export interface SyncRulesBucketStorage { checkpoint: util.OpId, dataBuckets: Map, options?: BucketDataBatchOptions - ): AsyncIterable; + ): AsyncIterable; /** * Compute checksums for a given list of buckets. @@ -388,6 +388,10 @@ export interface SaveDelete { after?: undefined; } +export interface SyncBucketDataBatch extends util.SyncBucketData { + targetOp: bigint | null; +} + export function mergeToast(record: ToastableSqliteRow, persisted: ToastableSqliteRow): ToastableSqliteRow { const newRecord: ToastableSqliteRow = {}; for (let key in record) { diff --git a/packages/service-core/src/storage/mongo/MongoCompactor.ts b/packages/service-core/src/storage/mongo/MongoCompactor.ts index 72f9025..a29dbd0 100644 --- a/packages/service-core/src/storage/mongo/MongoCompactor.ts +++ b/packages/service-core/src/storage/mongo/MongoCompactor.ts @@ -56,12 +56,14 @@ export class MongoCompactor { let currentState: CurrentBucketState | null = null; + // Constant lower bound const lowerBound: BucketDataKey = { g: this.group_id, b: new MinKey() as any, o: new MinKey() as any }; + // Upper bound is adjusted for each batch let upperBound: BucketDataKey = { g: this.group_id, b: new MaxKey() as any, @@ -142,13 +144,14 @@ export class MongoCompactor { update: { $set: { op: 'MOVE', - data: JSON.stringify({ target: `${targetOp}` }) + target_op: targetOp }, $unset: { source_table: 1, source_key: 1, table: 1, - row_id: 1 + row_id: 1, + data: 1 } } } @@ -185,7 +188,6 @@ export class MongoCompactor { } } - console.log('size', currentState?.trackingSize, idLimitBytes, currentState?.seen.size); await this.flush(); currentState?.seen.clear(); if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) { @@ -207,6 +209,7 @@ export class MongoCompactor { // Order is not important. // Since checksums are not affected, these operations can happen in any order, // and it's fine if the operations are partially applied. + // Each individual operation is atomic. ordered: false }); this.updates = []; @@ -256,6 +259,7 @@ export class MongoCompactor { }); let checksum = 0; let lastOpId: BucketDataKey | null = null; + let targetOp: bigint | null = null; let gotAnOp = false; for await (let op of query.stream()) { if (op.op == 'MOVE' || op.op == 'REMOVE' || op.op == 'CLEAR') { @@ -264,6 +268,11 @@ export class MongoCompactor { if (op.op != 'CLEAR') { gotAnOp = true; } + if (op.target_op != null) { + if (targetOp == null || op.target_op > targetOp) { + targetOp = op.target_op; + } + } } else { throw new Error(`Unexpected ${op.op} operation at ${op._id.g}:${op._id.b}:${op._id.o}`); } @@ -293,7 +302,8 @@ export class MongoCompactor { _id: lastOpId!, op: 'CLEAR', checksum: checksum, - data: null + data: null, + target_op: targetOp }, { session } ); diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 9752f85..f3d4e63 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -13,6 +13,7 @@ import { FlushedResult, ResolveTableOptions, ResolveTableResult, + SyncBucketDataBatch, SyncRulesBucketStorage, SyncRuleStatus } from '../BucketStorage.js'; @@ -202,7 +203,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { checkpoint: util.OpId, dataBuckets: Map, options?: BucketDataBatchOptions - ): AsyncIterable { + ): AsyncIterable { if (dataBuckets.size == 0) { return; } @@ -267,7 +268,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { } let batchSize = 0; - let currentBatch: util.SyncBucketData | null = null; + let currentBatch: SyncBucketDataBatch | null = null; // Ordered by _id, meaning buckets are grouped together for (let rawData of data) { @@ -297,7 +298,8 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { after: start, has_more: hasMore, data: [], - next_after: start + next_after: start, + targetOp: null }; } @@ -315,11 +317,16 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { }; } else { // MOVE, CLEAR + if (row.target_op != null) { + if (currentBatch.targetOp == null || row.target_op > currentBatch.targetOp) { + currentBatch.targetOp = row.target_op; + } + } + entry = { op_id: util.timestampToOpId(row._id.o), op: row.op, - checksum: Number(row.checksum), - data: row.data + checksum: Number(row.checksum) }; } currentBatch.data.push(entry); diff --git a/packages/service-core/src/storage/mongo/models.ts b/packages/service-core/src/storage/mongo/models.ts index 969fde0..8ac4ed3 100644 --- a/packages/service-core/src/storage/mongo/models.ts +++ b/packages/service-core/src/storage/mongo/models.ts @@ -48,6 +48,7 @@ export interface BucketDataDocument { row_id?: string; checksum: number; data: string | null; + target_op?: bigint | null; } export type OpType = 'PUT' | 'REMOVE' | 'MOVE' | 'CLEAR'; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index a0f7af1..baa042f 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -227,6 +227,9 @@ async function* bucketDataInBatches(request: BucketDataRequest) { async function* bucketDataBatch(request: BucketDataRequest) { const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal } = request; + const checkpointOp = BigInt(checkpoint); + let checkpointInvalidated = false; + const [_, release] = await syncSemaphore.acquire(); try { // Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint @@ -243,6 +246,9 @@ async function* bucketDataBatch(request: BucketDataRequest) { if (r.has_more) { has_more = true; } + if (r.targetOp != null && r.targetOp > checkpointOp) { + checkpointInvalidated = true; + } if (r.data.length == 0) { continue; } @@ -278,12 +284,19 @@ async function* bucketDataBatch(request: BucketDataRequest) { } if (!has_more) { - const line: util.StreamingSyncCheckpointComplete = { - checkpoint_complete: { - last_op_id: checkpoint - } - }; - yield { data: line, done: true }; + if (checkpointInvalidated) { + // Checkpoint invalidated by a CLEAR or MOVE op. + // Don't send the checkpoint_complete line in this case. + // More data should be available immediately for a new checkpoint. + yield { data: null, done: true }; + } else { + const line: util.StreamingSyncCheckpointComplete = { + checkpoint_complete: { + last_op_id: checkpoint + } + }; + yield { data: line, done: true }; + } } } finally { release();