Skip to content

Commit

Permalink
Process all MOVE target_op fields on the server.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Jul 15, 2024
1 parent 88a938c commit 4db1895
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 16 deletions.
6 changes: 5 additions & 1 deletion packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export interface SyncRulesBucketStorage {
checkpoint: util.OpId,
dataBuckets: Map<string, string>,
options?: BucketDataBatchOptions
): AsyncIterable<util.SyncBucketData>;
): AsyncIterable<SyncBucketDataBatch>;

/**
* Compute checksums for a given list of buckets.
Expand Down Expand Up @@ -388,6 +388,10 @@ export interface SaveDelete {
after?: undefined;
}

export interface SyncBucketDataBatch extends util.SyncBucketData {
targetOp: bigint | null;
}

export function mergeToast(record: ToastableSqliteRow, persisted: ToastableSqliteRow): ToastableSqliteRow {
const newRecord: ToastableSqliteRow = {};
for (let key in record) {
Expand Down
18 changes: 14 additions & 4 deletions packages/service-core/src/storage/mongo/MongoCompactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ export class MongoCompactor {

let currentState: CurrentBucketState | null = null;

// Constant lower bound
const lowerBound: BucketDataKey = {
g: this.group_id,
b: new MinKey() as any,
o: new MinKey() as any
};

// Upper bound is adjusted for each batch
let upperBound: BucketDataKey = {
g: this.group_id,
b: new MaxKey() as any,
Expand Down Expand Up @@ -142,13 +144,14 @@ export class MongoCompactor {
update: {
$set: {
op: 'MOVE',
data: JSON.stringify({ target: `${targetOp}` })
target_op: targetOp
},
$unset: {
source_table: 1,
source_key: 1,
table: 1,
row_id: 1
row_id: 1,
data: 1
}
}
}
Expand Down Expand Up @@ -185,7 +188,6 @@ export class MongoCompactor {
}
}

console.log('size', currentState?.trackingSize, idLimitBytes, currentState?.seen.size);
await this.flush();
currentState?.seen.clear();
if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) {
Expand All @@ -207,6 +209,7 @@ export class MongoCompactor {
// Order is not important.
// Since checksums are not affected, these operations can happen in any order,
// and it's fine if the operations are partially applied.
// Each individual operation is atomic.
ordered: false
});
this.updates = [];
Expand Down Expand Up @@ -256,6 +259,7 @@ export class MongoCompactor {
});
let checksum = 0;
let lastOpId: BucketDataKey | null = null;
let targetOp: bigint | null = null;
let gotAnOp = false;
for await (let op of query.stream()) {
if (op.op == 'MOVE' || op.op == 'REMOVE' || op.op == 'CLEAR') {
Expand All @@ -264,6 +268,11 @@ export class MongoCompactor {
if (op.op != 'CLEAR') {
gotAnOp = true;
}
if (op.target_op != null) {
if (targetOp == null || op.target_op > targetOp) {
targetOp = op.target_op;
}
}
} else {
throw new Error(`Unexpected ${op.op} operation at ${op._id.g}:${op._id.b}:${op._id.o}`);
}
Expand Down Expand Up @@ -293,7 +302,8 @@ export class MongoCompactor {
_id: lastOpId!,
op: 'CLEAR',
checksum: checksum,
data: null
data: null,
target_op: targetOp
},
{ session }
);
Expand Down
17 changes: 12 additions & 5 deletions packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
FlushedResult,
ResolveTableOptions,
ResolveTableResult,
SyncBucketDataBatch,
SyncRulesBucketStorage,
SyncRuleStatus
} from '../BucketStorage.js';
Expand Down Expand Up @@ -202,7 +203,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
checkpoint: util.OpId,
dataBuckets: Map<string, string>,
options?: BucketDataBatchOptions
): AsyncIterable<util.SyncBucketData> {
): AsyncIterable<SyncBucketDataBatch> {
if (dataBuckets.size == 0) {
return;
}
Expand Down Expand Up @@ -267,7 +268,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
}

let batchSize = 0;
let currentBatch: util.SyncBucketData | null = null;
let currentBatch: SyncBucketDataBatch | null = null;

// Ordered by _id, meaning buckets are grouped together
for (let rawData of data) {
Expand Down Expand Up @@ -297,7 +298,8 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
after: start,
has_more: hasMore,
data: [],
next_after: start
next_after: start,
targetOp: null
};
}

Expand All @@ -315,11 +317,16 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
};
} else {
// MOVE, CLEAR
if (row.target_op != null) {
if (currentBatch.targetOp == null || row.target_op > currentBatch.targetOp) {
currentBatch.targetOp = row.target_op;
}
}

entry = {
op_id: util.timestampToOpId(row._id.o),
op: row.op,
checksum: Number(row.checksum),
data: row.data
checksum: Number(row.checksum)
};
}
currentBatch.data.push(entry);
Expand Down
1 change: 1 addition & 0 deletions packages/service-core/src/storage/mongo/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface BucketDataDocument {
row_id?: string;
checksum: number;
data: string | null;
target_op?: bigint | null;
}

export type OpType = 'PUT' | 'REMOVE' | 'MOVE' | 'CLEAR';
Expand Down
25 changes: 19 additions & 6 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ async function* bucketDataInBatches(request: BucketDataRequest) {
async function* bucketDataBatch(request: BucketDataRequest) {
const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal } = request;

const checkpointOp = BigInt(checkpoint);
let checkpointInvalidated = false;

const [_, release] = await syncSemaphore.acquire();
try {
// Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint
Expand All @@ -243,6 +246,9 @@ async function* bucketDataBatch(request: BucketDataRequest) {
if (r.has_more) {
has_more = true;
}
if (r.targetOp != null && r.targetOp > checkpointOp) {
checkpointInvalidated = true;
}
if (r.data.length == 0) {
continue;
}
Expand Down Expand Up @@ -278,12 +284,19 @@ async function* bucketDataBatch(request: BucketDataRequest) {
}

if (!has_more) {
const line: util.StreamingSyncCheckpointComplete = {
checkpoint_complete: {
last_op_id: checkpoint
}
};
yield { data: line, done: true };
if (checkpointInvalidated) {
// Checkpoint invalidated by a CLEAR or MOVE op.
// Don't send the checkpoint_complete line in this case.
// More data should be available immediately for a new checkpoint.
yield { data: null, done: true };
} else {
const line: util.StreamingSyncCheckpointComplete = {
checkpoint_complete: {
last_op_id: checkpoint
}
};
yield { data: line, done: true };
}
}
} finally {
release();
Expand Down

0 comments on commit 4db1895

Please sign in to comment.