Skip to content

Commit

Permalink
WIP data item flushing contention fix
Browse files Browse the repository at this point in the history
  • Loading branch information
djwhitt committed Oct 3, 2024
1 parent 12cb0c2 commit 0b30d6b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"url": "https://github.com/ar-io/ar-io-node"
},
"dependencies": {
"@ar.io/sdk": "^2.0.0",
"@ar.io/sdk": "^2.2.5",
"@aws-lite/client": "^0.21.7",
"@aws-lite/s3": "^0.1.21",
"@clickhouse/client": "^1.3.0",
Expand Down
49 changes: 38 additions & 11 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -941,12 +941,24 @@ export class StandaloneSqliteDatabaseWorker {
const endHeight = block.height - MAX_FORK_DEPTH;

this.saveCoreStableDataFn(endHeight);
this.saveBundlesStableDataFn(endHeight);

this.deleteCoreStaleNewDataFn(
endHeight,
maxStableBlockTimestamp - NEW_TX_CLEANUP_WAIT_SECS,
);

return { maxStableBlockTimestamp };
}

return {};
}

flushStableDataItems(height: number, maxStableBlockTimestamp: number) {
if (height % STABLE_FLUSH_INTERVAL === 0) {
const endHeight = height - MAX_FORK_DEPTH;

this.saveBundlesStableDataFn(endHeight);

this.deleteBundlesStaleNewDataFn(
endHeight,
maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS,
Expand Down Expand Up @@ -2370,7 +2382,7 @@ const WORKER_POOL_NAMES: Array<WorkerPoolName> = [
'bundles',
];

type WorkerMethodName = keyof StandaloneSqliteDatabase;
type WorkerMethodName = keyof StandaloneSqliteDatabaseWorker;

type WorkerRoleName = 'read' | 'write';
const WORKER_ROLE_NAMES: Array<WorkerRoleName> = ['read', 'write'];
Expand Down Expand Up @@ -2742,16 +2754,22 @@ export class StandaloneSqliteDatabase
return this.queueWrite('bundles', 'saveBundle', [bundle]);
}

saveBlockAndTxs(
async saveBlockAndTxs(
block: PartialJsonBlock,
txs: PartialJsonTransaction[],
missingTxIds: string[],
): Promise<void> {
return this.queueWrite('core', 'saveBlockAndTxs', [
block,
txs,
missingTxIds,
]);
const { maxStableBlockTimestamp } = await this.queueWrite(
'core',
'saveBlockAndTxs',
[block, txs, missingTxIds],
);
if (maxStableBlockTimestamp !== undefined) {
await this.queueWrite('bundles', 'flushStableDataItems', [
block.height,
maxStableBlockTimestamp,
]);
}
}

async getDataAttributes(
Expand Down Expand Up @@ -3068,9 +3086,18 @@ if (!isMainThread) {
parentPort?.postMessage(null);
break;
case 'saveBlockAndTxs':
const [block, txs, missingTxIds] = args;
worker.saveBlockAndTxs(block, txs, missingTxIds);
parentPort?.postMessage(null);
{
const [block, txs, missingTxIds] = args;
const ret = worker.saveBlockAndTxs(block, txs, missingTxIds);
parentPort?.postMessage(ret);
}
break;
case 'flushStableDataItems':
{
const [height, maxStableBlockTimestamp] = args;
worker.flushStableDataItems(height, maxStableBlockTimestamp);
parentPort?.postMessage(null);
}
break;
case 'getDataAttributes':
const dataAttributes = worker.getDataAttributes(args[0]);
Expand Down

0 comments on commit 0b30d6b

Please sign in to comment.