Skip to content

Commit d40f3ab

Browse files
authored
Merge pull request #17 from powersync-ja/checksum-cache
Checksum cache
2 parents 9f91c92 + db9e051 commit d40f3ab

File tree

13 files changed

+893
-87
lines changed

13 files changed

+893
-87
lines changed

.changeset/tiny-ads-try.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-core': patch
3+
'powersync-open-service': patch
4+
---
5+
6+
- Use a LRU cache for checksum computations, improving performance and reducing MongoDB database load.
7+
- Return zero checksums to the client instead of omitting, to help with debugging sync issues.

packages/service-core/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
"dependencies": {
1919
"@journeyapps-platform/micro": "^17.0.1",
2020
"@journeyapps-platform/micro-migrate": "^4.0.1",
21+
"@js-sdsl/ordered-set": "^4.4.2",
2122
"@opentelemetry/api": "~1.8.0",
22-
"@opentelemetry/resources": "^1.24.1",
23-
"@opentelemetry/exporter-prometheus": "^0.51.1",
2423
"@opentelemetry/exporter-metrics-otlp-http": "^0.51.1",
24+
"@opentelemetry/exporter-prometheus": "^0.51.1",
25+
"@opentelemetry/resources": "^1.24.1",
2526
"@opentelemetry/sdk-metrics": "1.24.1",
2627
"@powersync/service-jpgwire": "workspace:*",
2728
"@powersync/service-jsonbig": "workspace:*",
2829
"@powersync/service-rsocket-router": "workspace:*",
29-
"@powersync/service-types": "workspace:*",
3030
"@powersync/service-sync-rules": "workspace:*",
31+
"@powersync/service-types": "workspace:*",
3132
"async-mutex": "^0.5.0",
3233
"bson": "^6.6.0",
3334
"commander": "^12.0.0",

packages/service-core/src/storage/BucketStorage.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,12 @@ export interface SyncRulesBucketStorage {
230230
options?: BucketDataBatchOptions
231231
): AsyncIterable<util.SyncBucketData>;
232232

233-
getChecksums(checkpoint: util.OpId, buckets: string[]): Promise<util.BucketChecksum[]>;
233+
/**
234+
* Compute checksums for a given list of buckets.
235+
*
236+
* Returns zero checksums for any buckets not found.
237+
*/
238+
getChecksums(checkpoint: util.OpId, buckets: string[]): Promise<util.ChecksumMap>;
234239

235240
/**
236241
* Terminate the sync rules.
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
import { BucketChecksum, OpId } from '@/util/protocol-types.js';
2+
import { ChecksumMap, addBucketChecksums } from '@/util/utils.js';
3+
import { LRUCache } from 'lru-cache/min';
4+
import { OrderedSet } from '@js-sdsl/ordered-set';
5+
6+
interface ChecksumFetchContext {
7+
fetch(bucket: string): Promise<BucketChecksum>;
8+
checkpoint: bigint;
9+
}
10+
11+
export interface FetchPartialBucketChecksum {
12+
bucket: string;
13+
start?: OpId;
14+
end: OpId;
15+
}
16+
17+
export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise<ChecksumMap>;
18+
19+
export interface ChecksumCacheOptions {
20+
/**
21+
* Upstream checksum implementation.
22+
*
23+
* This fetches a batch of either entire bucket checksums, or a partial range.
24+
*/
25+
fetchChecksums: FetchChecksums;
26+
27+
/**
28+
* Maximum number of cached checksums.
29+
*/
30+
maxSize?: number;
31+
}
32+
33+
// Approximately 5MB of memory, if we assume 50 bytes per entry
34+
const DEFAULT_MAX_SIZE = 100_000;
35+
36+
/**
37+
* Implement a LRU cache for checksum requests. Each (bucket, checkpoint) request is cached separately,
38+
* while the lookups occur in batches.
39+
*
40+
* For each bucket, we keep a separate OrderedSet of cached checkpoints.
41+
* This allows us to do incrementally update checksums by using the last cached checksum for the same bucket.
42+
*
43+
* We use the LRUCache fetchMethod to deduplicate in-progress requests.
44+
*/
45+
export class ChecksumCache {
46+
/**
47+
* The primary checksum cache, with key of `${checkpoint}/${bucket}`.
48+
*/
49+
private cache: LRUCache<string, BucketChecksum, ChecksumFetchContext>;
50+
/**
51+
* For each bucket, an ordered set of cached checkpoints.
52+
*/
53+
private bucketCheckpoints = new Map<string, OrderedSet<bigint>>();
54+
55+
private fetchChecksums: FetchChecksums;
56+
57+
constructor(options: ChecksumCacheOptions) {
58+
this.fetchChecksums = options.fetchChecksums;
59+
60+
this.cache = new LRUCache<string, BucketChecksum, ChecksumFetchContext>({
61+
max: options.maxSize ?? DEFAULT_MAX_SIZE,
62+
fetchMethod: async (cacheKey, _staleValue, options) => {
63+
// Called when this checksum hasn't been cached yet.
64+
// Pass the call back to the request, which implements batch fetching.
65+
const { bucket } = parseCacheKey(cacheKey);
66+
const result = await options.context.fetch(bucket);
67+
68+
// Add to the set of cached checkpoints for the bucket.
69+
let checkpointSet = this.bucketCheckpoints.get(bucket);
70+
if (checkpointSet == null) {
71+
checkpointSet = new OrderedSet();
72+
this.bucketCheckpoints.set(bucket, checkpointSet);
73+
}
74+
checkpointSet.insert(options.context.checkpoint);
75+
return result;
76+
},
77+
78+
dispose: (value, key) => {
79+
// Remove from the set of cached checkpoints for the bucket
80+
const { checkpointString } = parseCacheKey(key);
81+
const checkpoint = BigInt(checkpointString);
82+
const checkpointSet = this.bucketCheckpoints.get(value.bucket);
83+
if (checkpointSet == null) {
84+
return;
85+
}
86+
checkpointSet.eraseElementByKey(checkpoint);
87+
if (checkpointSet.length == 0) {
88+
this.bucketCheckpoints.delete(value.bucket);
89+
}
90+
},
91+
92+
noDisposeOnSet: true,
93+
94+
// When we have more fetches than the cache size, complete the fetches instead
95+
// of failing with Error('evicted').
96+
ignoreFetchAbort: true
97+
});
98+
}
99+
100+
async getChecksums(checkpoint: OpId, buckets: string[]): Promise<BucketChecksum[]> {
101+
const checksums = await this.getChecksumMap(checkpoint, buckets);
102+
// Return results in the same order as the request
103+
return buckets.map((bucket) => checksums.get(bucket)!);
104+
}
105+
106+
/**
107+
* Get bucket checksums for a checkpoint.
108+
*
109+
* Any checksums not found upstream are returned as zero checksums.
110+
*
111+
* @returns a Map with exactly one entry for each bucket requested
112+
*/
113+
async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise<ChecksumMap> {
114+
// Buckets that don't have a cached checksum for this checkpoint yet
115+
let toFetch = new Set<string>();
116+
117+
// Newly fetched results
118+
let fetchResults = new Map<string, BucketChecksum>();
119+
120+
// Promise for the bactch new fetch requests
121+
let resolveFetch!: () => void;
122+
let rejectFetch!: (err: any) => void;
123+
let fetchPromise = new Promise<void>((resolve, reject) => {
124+
resolveFetch = resolve;
125+
rejectFetch = reject;
126+
});
127+
128+
// Accumulated results - both from cached checksums, and fetched checksums
129+
let finalResults = new Map<string, BucketChecksum>();
130+
131+
const context: ChecksumFetchContext = {
132+
async fetch(bucket) {
133+
await fetchPromise;
134+
if (!toFetch.has(bucket)) {
135+
// Should never happen
136+
throw new Error(`Expected to fetch ${bucket}`);
137+
}
138+
const checksum = fetchResults.get(bucket);
139+
if (checksum == null) {
140+
// Should never happen
141+
throw new Error(`Failed to fetch checksum for bucket ${bucket}`);
142+
}
143+
return checksum;
144+
},
145+
checkpoint: BigInt(checkpoint)
146+
};
147+
148+
// One promise to await to ensure all fetch requests completed.
149+
let settledPromise: Promise<PromiseSettledResult<void>[]> | null = null;
150+
151+
try {
152+
// Individual cache fetch promises
153+
let cacheFetchPromises: Promise<void>[] = [];
154+
155+
for (let bucket of buckets) {
156+
const cacheKey = makeCacheKey(checkpoint, bucket);
157+
let status: LRUCache.Status<BucketChecksum> = {};
158+
const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => {
159+
if (checksums == null) {
160+
// Should never happen
161+
throw new Error(`Failed to get checksums for ${cacheKey}`);
162+
}
163+
finalResults.set(bucket, checksums);
164+
});
165+
cacheFetchPromises.push(p);
166+
if (status.fetch == 'hit' || status.fetch == 'inflight') {
167+
// The checksums is either cached already (hit), or another request is busy
168+
// fetching (inflight).
169+
// In either case, we don't need to fetch a new checksum.
170+
} else {
171+
// We need a new request for this checksum.
172+
toFetch.add(bucket);
173+
}
174+
}
175+
// We do this directly after creating the promises, otherwise
176+
// we could end up with weird uncaught rejection errors.
177+
settledPromise = Promise.allSettled(cacheFetchPromises);
178+
179+
if (toFetch.size == 0) {
180+
// Nothing to fetch, but resolve in case
181+
resolveFetch();
182+
} else {
183+
let bucketRequests: FetchPartialBucketChecksum[] = [];
184+
// Partial checksum (previously cached) to add to the partial fetch
185+
let add = new Map<string, BucketChecksum>();
186+
187+
for (let bucket of toFetch) {
188+
let bucketRequest: FetchPartialBucketChecksum | null = null;
189+
const checkpointSet = this.bucketCheckpoints.get(bucket);
190+
if (checkpointSet != null) {
191+
// Find smaller checkpoints, sorted in descending order
192+
let iter = checkpointSet.reverseUpperBound(context.checkpoint);
193+
const begin = checkpointSet.begin();
194+
while (iter.isAccessible()) {
195+
const cp = iter.pointer;
196+
const cacheKey = makeCacheKey(cp, bucket);
197+
// peek to avoid refreshing the key
198+
const cached = this.cache.peek(cacheKey);
199+
// As long as dispose() works correctly, the checkpointset should
200+
// match up with the cache, and `cached` should also have a value here.
201+
// However, we handle caces where it's not present either way.
202+
// Test by disabling the `dispose()` callback.
203+
if (cached != null) {
204+
// Partial checksum found - make a partial checksum request
205+
bucketRequest = {
206+
bucket,
207+
start: cp.toString(),
208+
end: checkpoint
209+
};
210+
add.set(bucket, cached);
211+
break;
212+
}
213+
214+
if (iter.equals(begin)) {
215+
// Cannot iterate further
216+
break;
217+
}
218+
// Iterate backwards
219+
iter = iter.pre();
220+
}
221+
}
222+
223+
if (bucketRequest == null) {
224+
// No partial checksum found - make a new full checksum request
225+
bucketRequest = {
226+
bucket,
227+
end: checkpoint
228+
};
229+
add.set(bucket, {
230+
bucket,
231+
checksum: 0,
232+
count: 0
233+
});
234+
}
235+
bucketRequests.push(bucketRequest);
236+
}
237+
238+
// Fetch partial checksums from upstream
239+
const results = await this.fetchChecksums(bucketRequests);
240+
241+
for (let bucket of toFetch) {
242+
const result = results.get(bucket);
243+
const toAdd = add.get(bucket);
244+
if (toAdd == null) {
245+
// Should never happen
246+
throw new Error(`toAdd null for ${bucket}`);
247+
}
248+
// Compute the full checksum from the two partials.
249+
// No results returned are treated the same as a zero result.
250+
const added = addBucketChecksums(toAdd, result ?? null);
251+
fetchResults.set(bucket, added);
252+
}
253+
254+
// fetchResults is fully populated, so we resolve the Promise
255+
resolveFetch();
256+
}
257+
} catch (e) {
258+
// Failure when fetching checksums - reject the Promise.
259+
// This will reject all individual cache fetch requests, and each will be retried
260+
// on the next request.
261+
rejectFetch(e);
262+
263+
// Wait for the above rejection to propagate, otherwise we end up with "uncaught" errors.
264+
// This promise never throws.
265+
await settledPromise;
266+
267+
throw e;
268+
}
269+
270+
// Wait for all cache fetch reqeusts to complete
271+
const settledResults = (await settledPromise) ?? [];
272+
// Check if any of them failed
273+
for (let result of settledResults) {
274+
if (result.status == 'rejected') {
275+
throw result.reason;
276+
}
277+
}
278+
279+
if (finalResults.size != buckets.length) {
280+
// Should not happen
281+
throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`);
282+
}
283+
return finalResults;
284+
}
285+
}
286+
287+
function makeCacheKey(checkpoint: bigint | string, bucket: string) {
288+
return `${checkpoint}/${bucket}`;
289+
}
290+
291+
function parseCacheKey(key: string) {
292+
const index = key.indexOf('/');
293+
return { checkpointString: key.substring(0, index), bucket: key.substring(index + 1) };
294+
}

0 commit comments

Comments
 (0)