From 46aa5ac83f4bd457b60a762d37ea75f864f76144 Mon Sep 17 00:00:00 2001 From: David Whittington Date: Thu, 3 Oct 2024 15:54:11 -0500 Subject: [PATCH] fix(sqlite): flush data items using bundle db writer PE-6872 Prior to this commit, data items were flushed via the core DB writer. Under sufficiently heavy bunde import load this lead to contention with data item writes. This change adjusts the flushing logic to use the bundle DB writer. This avoids concurrent bundle DB write attempts and eliminates the contention. --- package.json | 2 +- src/database/standalone-sqlite.ts | 53 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/package.json b/package.json index 3259c07d..7cd17cf1 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "url": "https://github.com/ar-io/ar-io-node" }, "dependencies": { - "@ar.io/sdk": "^2.0.0", + "@ar.io/sdk": "^2.2.5", "@aws-lite/client": "^0.21.7", "@aws-lite/s3": "^0.1.21", "@clickhouse/client": "^1.3.0", diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index e72c67e6..33fc5b96 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -941,17 +941,25 @@ export class StandaloneSqliteDatabaseWorker { const endHeight = block.height - MAX_FORK_DEPTH; this.saveCoreStableDataFn(endHeight); - this.saveBundlesStableDataFn(endHeight); this.deleteCoreStaleNewDataFn( endHeight, maxStableBlockTimestamp - NEW_TX_CLEANUP_WAIT_SECS, ); - this.deleteBundlesStaleNewDataFn( - endHeight, - maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS, - ); + + return { endHeight, maxStableBlockTimestamp }; } + + return {}; + } + + flushStableDataItems(endHeight: number, maxStableBlockTimestamp: number) { + this.saveBundlesStableDataFn(endHeight); + + this.deleteBundlesStaleNewDataFn( + endHeight, + maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS, + ); } getDataAttributes(id: string) { @@ -2370,7 +2378,7 @@ const WORKER_POOL_NAMES: Array = [ 'bundles', ]; -type WorkerMethodName = keyof StandaloneSqliteDatabase; +type WorkerMethodName = keyof StandaloneSqliteDatabaseWorker; type WorkerRoleName = 'read' | 'write'; const WORKER_ROLE_NAMES: Array = ['read', 'write']; @@ -2742,16 +2750,22 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveBundle', [bundle]); } - saveBlockAndTxs( + async saveBlockAndTxs( block: PartialJsonBlock, txs: PartialJsonTransaction[], missingTxIds: string[], ): Promise { - return this.queueWrite('core', 'saveBlockAndTxs', [ - block, - txs, - missingTxIds, - ]); + const { endHeight, maxStableBlockTimestamp } = await this.queueWrite( + 'core', + 'saveBlockAndTxs', + [block, txs, missingTxIds], + ); + if (maxStableBlockTimestamp !== undefined) { + await this.queueWrite('bundles', 'flushStableDataItems', [ + endHeight, + maxStableBlockTimestamp, + ]); + } } async getDataAttributes( @@ -3068,9 +3082,18 @@ if (!isMainThread) { parentPort?.postMessage(null); break; case 'saveBlockAndTxs': - const [block, txs, missingTxIds] = args; - worker.saveBlockAndTxs(block, txs, missingTxIds); - parentPort?.postMessage(null); + { + const [block, txs, missingTxIds] = args; + const ret = worker.saveBlockAndTxs(block, txs, missingTxIds); + parentPort?.postMessage(ret); + } + break; + case 'flushStableDataItems': + { + const [endHeight, maxStableBlockTimestamp] = args; + worker.flushStableDataItems(endHeight, maxStableBlockTimestamp); + parentPort?.postMessage(null); + } break; case 'getDataAttributes': const dataAttributes = worker.getDataAttributes(args[0]);