From 5af5926ce0a1515edb25e4e3a34c6207477b2693 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Jun 2024 14:00:51 +0200 Subject: [PATCH] Add a new checksum cache implementation that doesn't require locking checkpoints. --- packages/service-core/package.json | 7 +- .../service-core/src/storage/ChecksumCache.ts | 24 +- .../src/storage/ChecksumCacheTwo.ts | 205 ++++++++++++++ .../test/src/checksum_cache.test.ts | 256 ++++++++++++++++++ pnpm-lock.yaml | 8 + service/package.json | 12 +- 6 files changed, 497 insertions(+), 15 deletions(-) create mode 100644 packages/service-core/src/storage/ChecksumCacheTwo.ts create mode 100644 packages/service-core/test/src/checksum_cache.test.ts diff --git a/packages/service-core/package.json b/packages/service-core/package.json index 65fce0e..a41d0bd 100644 --- a/packages/service-core/package.json +++ b/packages/service-core/package.json @@ -18,16 +18,17 @@ "dependencies": { "@journeyapps-platform/micro": "^17.0.1", "@journeyapps-platform/micro-migrate": "^4.0.1", + "@js-sdsl/ordered-set": "^4.4.2", "@opentelemetry/api": "~1.8.0", - "@opentelemetry/resources": "^1.24.1", - "@opentelemetry/exporter-prometheus": "^0.51.1", "@opentelemetry/exporter-metrics-otlp-http": "^0.51.1", + "@opentelemetry/exporter-prometheus": "^0.51.1", + "@opentelemetry/resources": "^1.24.1", "@opentelemetry/sdk-metrics": "1.24.1", "@powersync/service-jpgwire": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-rsocket-router": "workspace:*", - "@powersync/service-types": "workspace:*", "@powersync/service-sync-rules": "workspace:*", + "@powersync/service-types": "workspace:*", "async-mutex": "^0.5.0", "bson": "^6.6.0", "commander": "^12.0.0", diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 731a8ed..36ec918 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -19,11 +19,23 @@ export interface FetchPartialBucketChecksum { export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; -export class ChecksumCache { +export interface ChecksumCacheOptions { + fetchChecksums: FetchChecksums; + maxSize?: number; +} + +export interface ChecksumCacheInterface { + getChecksums(checkpoint: OpId, buckets: string[]): Promise; +} + +export class ChecksumCache implements ChecksumCacheInterface { private nextRefId = 1; private checkpoints = new Map(); + private fetchChecksums: FetchChecksums; - constructor(private fetchChecksums: FetchChecksums) {} + constructor(options: ChecksumCacheOptions) { + this.fetchChecksums = options.fetchChecksums; + } async lock(checkpoint: OpId) { const ref = this.nextRefId++; @@ -35,7 +47,7 @@ export class ChecksumCache { const entry: CheckpointEntry = { refs: new Set([ref]), cache: new LRUCache({ - maxSize: 10_000, + max: 10_000, fetchMethod: async (bucket, staleValue, options) => { return options.context.fetch(bucket); } @@ -73,7 +85,7 @@ export class ChecksumCache { entry = { refs: new Set([]), cache: new LRUCache({ - maxSize: 10_000, + max: 10_000, fetchMethod: async (bucket, staleValue, options) => { return options.context.fetch(bucket); } @@ -86,10 +98,10 @@ export class ChecksumCache { const context: ChecksumFetchContext = { async fetch(bucket) { + await fetchPromise; if (!toFetch.has(bucket)) { throw new Error(`Expected to fetch ${bucket}`); } - await fetchPromise; const checksum = fetchResults.get(bucket); if (checksum == null) { throw new Error(`Failed to fetch checksum for bucket ${bucket}`); @@ -145,7 +157,7 @@ export class ChecksumCache { throw new Error(`Cannot find cached checkpoint ${cp}`); } - const cached = entry.cache.get(bucket); + const cached = entry.cache.peek(bucket); if (cached != null) { bucketRequest = { bucket, diff --git a/packages/service-core/src/storage/ChecksumCacheTwo.ts b/packages/service-core/src/storage/ChecksumCacheTwo.ts new file mode 100644 index 0000000..c4277da --- /dev/null +++ b/packages/service-core/src/storage/ChecksumCacheTwo.ts @@ -0,0 +1,205 @@ +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { addBucketChecksums } from '@/util/utils.js'; +import { LRUCache } from 'lru-cache/min'; +import { OrderedSet } from '@js-sdsl/ordered-set'; +import { ChecksumCacheInterface } from './ChecksumCache.js'; + +interface ChecksumFetchContext { + fetch(bucket: string): Promise; + checkpoint: bigint; +} + +export interface FetchPartialBucketChecksum { + bucket: string; + start?: OpId; + end: OpId; +} + +export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; + +export interface ChecksumCacheOptions { + fetchChecksums: FetchChecksums; + maxSize?: number; +} + +// Approximately 5MB of memory, if we assume 50 bytes per entry +const DEFAULT_MAX_SIZE = 100_000; + +/** + * Implement a LRU cache for checksum requests. Each (bucket, checkpoint) request is cached separately, + * while the lookups occur in batches. + * + * For each bucket, we keep a separate OrderedSet of cached checkpoints. + * This allows us to do incrementally update checksums by using the last cached checksum for the same bucket. + * + * We use the LRUCache fetchMethod to deduplicate in-progress requests. + */ +export class ChecksumCache implements ChecksumCacheInterface { + /** + * The primary checksum cache, with key of `${checkpoint}/${bucket}`. + */ + private cache: LRUCache; + + private bucketCheckpoints = new Map>(); + private fetchChecksums: FetchChecksums; + + constructor(options: ChecksumCacheOptions) { + this.fetchChecksums = options.fetchChecksums; + + this.cache = new LRUCache({ + max: options.maxSize ?? DEFAULT_MAX_SIZE, + fetchMethod: async (cacheKey, _staleValue, options) => { + const split = cacheKey.indexOf('/'); + const bucket = cacheKey.substring(split + 1); + + const result = await options.context.fetch(bucket); + + let checkpointSet = this.bucketCheckpoints.get(bucket); + if (checkpointSet == null) { + checkpointSet = new OrderedSet(); + this.bucketCheckpoints.set(bucket, checkpointSet); + } + checkpointSet.insert(options.context.checkpoint); + return result; + }, + + disposeAfter: (value, key) => { + const split = key.indexOf('/'); + const checkpointString = key.substring(0, split); + const checkpoint = BigInt(checkpointString); + const checkpointSet = this.bucketCheckpoints.get(value.bucket); + if (checkpointSet == null) { + return; + } + checkpointSet.eraseElementByKey(checkpoint); + if (checkpointSet.length == 0) { + this.bucketCheckpoints.delete(value.bucket); + } + }, + + noDisposeOnSet: true + }); + } + + async getChecksums(checkpoint: OpId, buckets: string[]) { + let toFetch = new Set(); + let fetchResults = new Map(); + let resolveFetch!: () => void; + let rejectFetch!: (err: any) => void; + let fetchPromise = new Promise((resolve, reject) => { + resolveFetch = resolve; + rejectFetch = reject; + }); + + let finalResults: BucketChecksum[] = []; + + const context: ChecksumFetchContext = { + async fetch(bucket) { + await fetchPromise; + if (!toFetch.has(bucket)) { + // Should never happen + throw new Error(`Expected to fetch ${bucket}`); + } + const checksum = fetchResults.get(bucket); + if (checksum == null) { + // Should never happen + throw new Error(`Failed to fetch checksum for bucket ${bucket}`); + } + return checksum; + }, + checkpoint: BigInt(checkpoint) + }; + + let promises: Promise[] = []; + + try { + for (let bucket of buckets) { + const cacheKey = `${checkpoint}/${bucket}`; + let status: LRUCache.Status = {}; + const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => { + if (checksums == null) { + // Should never happen + throw new Error(`Failed to get checksums for ${cacheKey}`); + } + finalResults.push(checksums); + }); + promises.push(p); + if (status.fetch == 'hit' || status.fetch == 'inflight') { + // No need to fetch now + } else { + toFetch.add(bucket); + } + } + + if (toFetch.size == 0) { + // Nothing to fetch, but resolve in case + resolveFetch(); + } else { + // Find smaller checkpoints, sorted in descending order + + let bucketRequests: FetchPartialBucketChecksum[] = []; + let add = new Map(); + + for (let bucket of toFetch) { + let bucketRequest: FetchPartialBucketChecksum | null = null; + const checkpointSet = this.bucketCheckpoints.get(bucket); + if (checkpointSet != null) { + const iter = checkpointSet.reverseUpperBound(context.checkpoint - 1n); + while (iter.isAccessible()) { + const cp = iter.pointer; + const cacheKey = `${cp}/${bucket}`; + // peek to avoid refreshing the key + const cached = this.cache.peek(cacheKey); + if (cached != null) { + bucketRequest = { + bucket, + start: cp.toString(), + end: checkpoint + }; + add.set(bucket, cached); + break; + } + + iter.next(); + } + } + + if (bucketRequest == null) { + bucketRequest = { + bucket, + end: checkpoint + }; + add.set(bucket, { + bucket, + checksum: 0, + count: 0 + }); + } + bucketRequests.push(bucketRequest); + } + + const results = await this.fetchChecksums(bucketRequests); + for (let bucket of toFetch) { + const result = results.get(bucket); + const toAdd = add.get(bucket); + if (toAdd == null) { + // Should never happen + throw new Error(`toAdd null for ${bucket}`); + } + const added = addBucketChecksums(toAdd, result ?? null); + fetchResults.set(bucket, added); + } + resolveFetch(); + } + } catch (e) { + rejectFetch(e); + throw e; + } + + await Promise.all(promises); + if (finalResults.length != buckets.length) { + throw new Error(`Bucket results mismatch: ${finalResults.length} != ${buckets.length}`); + } + return finalResults; + } +} diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts new file mode 100644 index 0000000..b393b93 --- /dev/null +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -0,0 +1,256 @@ +import { describe, expect, it } from 'vitest'; +import { + ChecksumCache, + ChecksumCacheInterface, + FetchChecksums, + FetchPartialBucketChecksum +} from '../../src/storage/ChecksumCache.js'; +import { ChecksumCache as ChecksumCacheTwo } from '../../src/storage/ChecksumCacheTwo.js'; +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import * as crypto from 'node:crypto'; +import { addBucketChecksums } from '@/util/util-index.js'; + +type CachsumCacheFactory = (fetch: FetchChecksums) => ChecksumCacheInterface; + +describe('checksum cache 1', function () { + defineChecksumCacheTests((f) => new ChecksumCache({ fetchChecksums: f })); +}); + +describe('checksum cache 2', function () { + defineChecksumCacheTests((f) => new ChecksumCacheTwo({ fetchChecksums: f })); +}); + +/** + * Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes. + */ +function testHash(bucket: string, checkpoint: OpId) { + const key = `${checkpoint}/${bucket}`; + const hash = crypto.createHash('sha256').update(key).digest().readInt32LE(0); + return hash; +} + +function testPartialHash(request: FetchPartialBucketChecksum): BucketChecksum { + if (request.start) { + const a = testHash(request.bucket, request.start); + const b = testHash(request.bucket, request.end); + return addBucketChecksums( + { + bucket: request.bucket, + checksum: b, + count: Number(request.end) + }, + { + // Subtract a + bucket: request.bucket, + checksum: -a, + count: -Number(request.start) + } + ); + } else { + return { + bucket: request.bucket, + checksum: testHash(request.bucket, request.end), + count: Number(request.end) + }; + } +} + +const TEST_123 = { + bucket: 'test', + count: 123, + checksum: 1104081737 +}; + +const TEST_1234 = { + bucket: 'test', + count: 1234, + checksum: -1593864957 +}; + +const TEST2_123 = { + bucket: 'test2', + count: 123, + checksum: 1741377449 +}; + +const TEST3_123 = { + bucket: 'test3', + count: 123, + checksum: -2085080402 +}; + +function fetchTestChecksums(batch: FetchPartialBucketChecksum[]) { + return new Map( + batch.map((v) => { + return [v.bucket, testPartialHash(v)]; + }) + ); +} + +function defineChecksumCacheTests(factory: CachsumCacheFactory) { + it('should handle a sequential lookups (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + + expect(await cache.getChecksums('123', ['test2'])).toEqual([TEST2_123]); + + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + // This should use the previous lookup + [{ bucket: 'test', start: '123', end: '1234' }], + [{ bucket: 'test2', end: '123' }] + ]); + }); + + it('should handle a sequential lookups (b)', async function () { + // Reverse order of the above + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test2'])).toEqual([TEST2_123]); + + expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + expect(lookups).toEqual([ + // With this order, there is no option for a partial lookup + [{ bucket: 'test2', end: '123' }], + [{ bucket: 'test', end: '1234' }], + [{ bucket: 'test', end: '123' }] + ]); + }); + + it('should handle a concurrent lookups (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + const p1 = cache.getChecksums('123', ['test']); + const p2 = cache.getChecksums('1234', ['test']); + const p3 = cache.getChecksums('123', ['test2']); + + expect(await p1).toEqual([TEST_123]); + expect(await p2).toEqual([TEST_1234]); + expect(await p3).toEqual([TEST2_123]); + + // Concurrent requests, so we can't do a partial lookup for 123 -> 1234 + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', end: '1234' }], + [{ bucket: 'test2', end: '123' }] + ]); + }); + + it('should handle a concurrent lookups (b)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + const p1 = cache.getChecksums('123', ['test']); + const p2 = cache.getChecksums('123', ['test']); + + expect(await p1).toEqual([TEST_123]); + + expect(await p2).toEqual([TEST_123]); + + // The lookup should be deduplicated, even though it's in progress + expect(lookups).toEqual([[{ bucket: 'test', end: '123' }]]); + }); + + it('should handle serial + concurrent lookups', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + const p2 = cache.getChecksums('1234', ['test']); + const p3 = cache.getChecksums('1234', ['test']); + + expect(await p2).toEqual([TEST_1234]); + expect(await p3).toEqual([TEST_1234]); + + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + // This lookup is deduplicated + [{ bucket: 'test', start: '123', end: '1234' }] + ]); + }); + + it('should handle multiple buckets', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + + expect(lookups).toEqual([ + [ + // Both lookups in the same request + { bucket: 'test', end: '123' }, + { bucket: 'test2', end: '123' } + ] + ]); + }); + + it('should handle multiple buckets with partial caching (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + + expect(lookups).toEqual([ + // Request 1 + [{ bucket: 'test', end: '123' }], + // Request 2 + [{ bucket: 'test2', end: '123' }] + ]); + }); + + it('should handle multiple buckets with partial caching (b)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + const a = cache.getChecksums('123', ['test', 'test2']); + const b = cache.getChecksums('123', ['test2', 'test3']); + + expect(await a).toEqual([TEST_123, TEST2_123]); + expect(await b).toEqual([TEST2_123, TEST3_123]); + + expect(lookups).toEqual([ + // Request a + [ + { bucket: 'test', end: '123' }, + { bucket: 'test2', end: '123' } + ], + // Request b (re-uses the checksum for test2 from request a) + [{ bucket: 'test3', end: '123' }] + ]); + }); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 985490c..fd614e2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -132,6 +132,9 @@ importers: '@journeyapps-platform/micro-migrate': specifier: ^4.0.1 version: 4.0.1(socks@2.8.3) + '@js-sdsl/ordered-set': + specifier: ^4.4.2 + version: 4.4.2 '@opentelemetry/api': specifier: ~1.8.0 version: 1.8.0 @@ -785,6 +788,9 @@ packages: '@js-sdsl/ordered-map@4.4.2': resolution: {integrity: sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw==} + '@js-sdsl/ordered-set@4.4.2': + resolution: {integrity: sha512-ieYQ8WlBPKYzEo81H3q0DFbd8WtFRXXABb4+vRCF0AO3WWtJZFxYvRGdipUXGrd6tlSySmqhcPuO3J6SCodCxg==} + '@jsdevtools/ono@7.1.3': resolution: {integrity: sha512-4JQNk+3mVzK3xh2rqd6RB4J46qUR19azEHBneZyTZM+c456qOrbbM/5xcR8huNCCcbVt7+UmizG6GuUvPvKUYg==} @@ -5449,6 +5455,8 @@ snapshots: '@js-sdsl/ordered-map@4.4.2': {} + '@js-sdsl/ordered-set@4.4.2': {} + '@jsdevtools/ono@7.1.3': {} '@ljharb/through@2.3.13': diff --git a/service/package.json b/service/package.json index f692cc3..c0804f1 100644 --- a/service/package.json +++ b/service/package.json @@ -13,15 +13,15 @@ "@fastify/cors": "8.4.1", "@journeyapps-platform/micro": "^17.0.1", "@journeyapps-platform/micro-migrate": "^4.0.1", - "@powersync/service-types": "workspace:*", - "@powersync/service-jpgwire": "workspace:*", - "@powersync/service-jsonbig": "workspace:*", - "@powersync/service-sync-rules": "workspace:*", - "@powersync/service-rsocket-router": "workspace:*", - "@powersync/service-core": "workspace:*", "@opentelemetry/api": "~1.6.0", "@opentelemetry/exporter-prometheus": "^0.43.0", "@opentelemetry/sdk-metrics": "^1.17.0", + "@powersync/service-core": "workspace:*", + "@powersync/service-jpgwire": "workspace:*", + "@powersync/service-jsonbig": "workspace:*", + "@powersync/service-rsocket-router": "workspace:*", + "@powersync/service-sync-rules": "workspace:*", + "@powersync/service-types": "workspace:*", "async-mutex": "^0.5.0", "bson": "^6.6.0", "commander": "^12.0.0",