diff --git a/package.json b/package.json index 3259c07d..7cd17cf1 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "url": "https://github.com/ar-io/ar-io-node" }, "dependencies": { - "@ar.io/sdk": "^2.0.0", + "@ar.io/sdk": "^2.2.5", "@aws-lite/client": "^0.21.7", "@aws-lite/s3": "^0.1.21", "@clickhouse/client": "^1.3.0", diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index e72c67e6..0842f20a 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -941,12 +941,24 @@ export class StandaloneSqliteDatabaseWorker { const endHeight = block.height - MAX_FORK_DEPTH; this.saveCoreStableDataFn(endHeight); - this.saveBundlesStableDataFn(endHeight); this.deleteCoreStaleNewDataFn( endHeight, maxStableBlockTimestamp - NEW_TX_CLEANUP_WAIT_SECS, ); + + return { maxStableBlockTimestamp }; + } + + return {}; + } + + flushStableDataItems(height: number, maxStableBlockTimestamp: number) { + if (height % STABLE_FLUSH_INTERVAL === 0) { + const endHeight = height - MAX_FORK_DEPTH; + + this.saveBundlesStableDataFn(endHeight); + this.deleteBundlesStaleNewDataFn( endHeight, maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS, @@ -2370,7 +2382,7 @@ const WORKER_POOL_NAMES: Array = [ 'bundles', ]; -type WorkerMethodName = keyof StandaloneSqliteDatabase; +type WorkerMethodName = keyof StandaloneSqliteDatabaseWorker; type WorkerRoleName = 'read' | 'write'; const WORKER_ROLE_NAMES: Array = ['read', 'write']; @@ -2742,16 +2754,22 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveBundle', [bundle]); } - saveBlockAndTxs( + async saveBlockAndTxs( block: PartialJsonBlock, txs: PartialJsonTransaction[], missingTxIds: string[], ): Promise { - return this.queueWrite('core', 'saveBlockAndTxs', [ - block, - txs, - missingTxIds, - ]); + const { maxStableBlockTimestamp } = await this.queueWrite( + 'core', + 'saveBlockAndTxs', + [block, txs, missingTxIds], + ); + if (maxStableBlockTimestamp !== undefined) { + await this.queueWrite('bundles', 'flushStableDataItems', [ + block.height, + maxStableBlockTimestamp, + ]); + } } async getDataAttributes( @@ -3068,9 +3086,18 @@ if (!isMainThread) { parentPort?.postMessage(null); break; case 'saveBlockAndTxs': - const [block, txs, missingTxIds] = args; - worker.saveBlockAndTxs(block, txs, missingTxIds); - parentPort?.postMessage(null); + { + const [block, txs, missingTxIds] = args; + const ret = worker.saveBlockAndTxs(block, txs, missingTxIds); + parentPort?.postMessage(ret); + } + break; + case 'flushStableDataItems': + { + const [height, maxStableBlockTimestamp] = args; + worker.flushStableDataItems(height, maxStableBlockTimestamp); + parentPort?.postMessage(null); + } break; case 'getDataAttributes': const dataAttributes = worker.getDataAttributes(args[0]);