From 90f51df4f55bd754dc209da01487b91cc2fc9373 Mon Sep 17 00:00:00 2001 From: David Whittington Date: Tue, 27 Jun 2023 16:45:46 -0500 Subject: [PATCH] feat(bundles repair): add bundle repair worker PE-4041 Adds a bundle repair worker that queries `bundles` and `bundle_data_item` tables to determine which bundles have been fully imported. It does this by setting bundle `last_fully_indexed_at` based on a comparison of `bundle_data_items` for each bundle to `matched_data_item_count` on the bundles (taking filters into account) and then using those `last_fully_indexed_at` timestamps to determine if the bundle should be reprocessed. --- .github/workflows/build-core.yml | 2 +- docker-compose.yaml | 1 + ...1.52.29.bundles.add-bundles-root-tx-id.sql | 2 + ...8T22.02.57.bundles.add-bundles-indexes.sql | 15 +++ ...1.52.29.bundles.add-bundles-root-tx-id.sql | 1 + ...8T22.02.57.bundles.add-bundles-indexes.sql | 8 ++ src/app.ts | 1 + src/config.ts | 2 + src/database/sql/bundles/import.sql | 4 +- src/database/sql/bundles/repair.sql | 75 ++++++++++++++ src/database/sql/core/accessors.sql | 8 +- src/database/standalone-sqlite.test.ts | 3 +- src/database/standalone-sqlite.ts | 95 ++++++++++++------ src/system.ts | 11 +++ src/types.d.ts | 23 ++++- src/workers/bundle-repair-worker.ts | 99 +++++++++++++++++++ src/workers/transaction-repair-worker.ts | 5 +- test/bundles-schema.sql | 12 ++- 18 files changed, 326 insertions(+), 41 deletions(-) create mode 100644 migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql create mode 100644 migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql create mode 100644 migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql create mode 100644 migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql create mode 100644 src/database/sql/bundles/repair.sql create mode 100644 src/workers/bundle-repair-worker.ts diff --git a/.github/workflows/build-core.yml b/.github/workflows/build-core.yml index dd3aa91e..5b3378f1 100644 --- a/.github/workflows/build-core.yml +++ b/.github/workflows/build-core.yml @@ -78,7 +78,7 @@ jobs: # Build and push container image to ECR - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1-node16 + uses: aws-actions/configure-aws-credentials@v2 with: role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_BUILD_INVOCATION_ROLE }} aws-region: ${{ secrets.AWS_REGION }} diff --git a/docker-compose.yaml b/docker-compose.yaml index ef7d626b..7d4d3b02 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -42,6 +42,7 @@ services: - INSTANCE_ID=${INSTANCE_ID:-} - AR_IO_WALLET=${AR_IO_WALLET:-} - ADMIN_API_KEY=${ADMIN_API_KEY:-} + - BACKFILL_BUNDLE_RECORDS=${BACKFILL_BUNDLE_RECORDS:-} - ANS104_UNBUNDLE_FILTER=${ANS104_UNBUNDLE_FILTER:-} - ANS104_INDEX_FILTER=${ANS104_INDEX_FILTER:-} - ARNS_ROOT_HOST=${ARNS_ROOT_HOST:-} diff --git a/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql b/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql new file mode 100644 index 00000000..2260f793 --- /dev/null +++ b/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql @@ -0,0 +1,2 @@ +ALTER TABLE bundles ADD COLUMN root_transaction_id BLOB; +UPDATE bundles SET root_transaction_id = id; diff --git a/migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql b/migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql new file mode 100644 index 00000000..9cf82af4 --- /dev/null +++ b/migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql @@ -0,0 +1,15 @@ +CREATE INDEX IF NOT EXISTS bundles_last_queued_at_idx + ON bundles (last_queued_at); +CREATE INDEX IF NOT EXISTS bundles_last_skipped_at_idx + ON bundles (last_skipped_at); +CREATE INDEX IF NOT EXISTS bundles_last_fully_indexed_at_idx + ON bundles (last_fully_indexed_at); +CREATE INDEX IF NOT EXISTS bundles_matched_data_item_count_idx + ON bundles (matched_data_item_count); +CREATE INDEX IF NOT EXISTS bundles_unbundle_filter_id_idx + ON bundles (unbundle_filter_id); +CREATE INDEX IF NOT EXISTS bundles_index_filter_id_idx + ON bundles (index_filter_id); + +CREATE INDEX IF NOT EXISTS bundle_data_items_parent_id_filter_id_idx + ON bundle_data_items (parent_id, filter_id); diff --git a/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql b/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql new file mode 100644 index 00000000..47e5cd42 --- /dev/null +++ b/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql @@ -0,0 +1 @@ +ALTER TABLE bundles DROP COLUMN root_transaction_id; diff --git a/migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql b/migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql new file mode 100644 index 00000000..20a26d11 --- /dev/null +++ b/migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql @@ -0,0 +1,8 @@ +DROP INDEX IF EXISTS bundle_data_items_parent_id_filter_id_idx; + +DROP INDEX IF EXISTS bundles_unbundle_filter_id_idx; +DROP INDEX IF EXISTS bundles_index_filter_id_idx; +DROP INDEX IF EXISTS bundles_matched_data_item_count_idx; +DROP INDEX IF EXISTS bundles_last_fully_indexed_at_idx; +DROP INDEX IF EXISTS bundles_last_skipped_at_idx; +DROP INDEX IF EXISTS bundles_last_queued_at_idx; diff --git a/src/app.ts b/src/app.ts index bdffaf99..78f9b3ce 100644 --- a/src/app.ts +++ b/src/app.ts @@ -39,6 +39,7 @@ import * as system from './system.js'; system.arweaveClient.refreshPeers(); system.blockImporter.start(); system.txRepairWorker.start(); +system.bundleRepairWorker.start(); // HTTP server const app = express(); diff --git a/src/config.ts b/src/config.ts index 100972e6..28755328 100644 --- a/src/config.ts +++ b/src/config.ts @@ -53,6 +53,8 @@ export const ADMIN_API_KEY = env.varOrDefault( if (env.varOrUndefined('ADMIN_API_KEY') === undefined) { log.info('Using a random admin key since none was set', { ADMIN_API_KEY }); } +export const BACKFILL_BUNDLE_RECORDS = + env.varOrDefault('BACKFILL_BUNDLE_RECORDS', 'false') === 'true'; export const ANS104_UNBUNDLE_FILTER_STRING = canonicalize( JSON.parse(env.varOrDefault('ANS104_UNBUNDLE_FILTER', '{"never": true}')), ); diff --git a/src/database/sql/bundles/import.sql b/src/database/sql/bundles/import.sql index 803b66a1..29fa4cf5 100644 --- a/src/database/sql/bundles/import.sql +++ b/src/database/sql/bundles/import.sql @@ -1,6 +1,6 @@ -- upsertBundle INSERT INTO bundles ( - id, format_id, + id, root_transaction_id, format_id, unbundle_filter_id, index_filter_id, data_item_count, matched_data_item_count, first_queued_at, last_queued_at, @@ -8,7 +8,7 @@ INSERT INTO bundles ( first_unbundled_at, last_unbundled_at, first_fully_indexed_at, last_fully_indexed_at ) VALUES ( - @id, @format_id, + @id, @root_transaction_id, @format_id, @unbundle_filter_id, @index_filter_id, @data_item_count, @matched_data_item_count, @queued_at, @queued_at, diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql new file mode 100644 index 00000000..c3f4c2a8 --- /dev/null +++ b/src/database/sql/bundles/repair.sql @@ -0,0 +1,75 @@ +-- selectFailedBundleIds +SELECT DISTINCT id +FROM ( + SELECT b.root_transaction_id AS id + FROM bundles b + WHERE ( + (b.last_queued_at IS NULL AND b.last_skipped_at IS NULL) + OR ( + b.last_queued_at IS NOT NULL + AND ( + b.last_skipped_at IS NULL + OR b.last_skipped_at <= b.last_queued_at + ) + AND b.last_queued_at < @reprocess_cutoff + ) + ) + AND b.last_fully_indexed_at IS NULL + AND ( + b.matched_data_item_count IS NULL + OR b.matched_data_item_count > 0 + ) + ORDER BY b.last_queued_at ASC + LIMIT @limit +) +ORDER BY RANDOM() + +-- updateFullyIndexedAt +UPDATE bundles +SET + first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at), + last_fully_indexed_at = @fully_indexed_at +WHERE matched_data_item_count IS NOT NULL + AND matched_data_item_count > 0 + AND EXISTS ( + SELECT 1 + FROM bundle_data_items bdi + WHERE bdi.parent_id = bundles.id + AND bdi.filter_id = bundles.unbundle_filter_id + GROUP BY bdi.parent_id + HAVING COUNT(*) = bundles.matched_data_item_count + ) AND last_fully_indexed_at IS NULL + +--insertMissingBundles +INSERT INTO bundles ( + id, + root_transaction_id, + format_id +) +SELECT + sttf.transaction_id, + sttf.transaction_id, + (SELECT id FROM bundle_formats WHERE format = 'ans-104') +FROM stable_transaction_tags sttf +JOIN stable_transaction_tags sttv ON sttv.transaction_id = sttf.transaction_id +LEFT JOIN bundles b ON b.id = sttf.transaction_id +WHERE sttf.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29' + AND sttf.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712' + AND sttv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216' + AND sttv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD' + AND b.id IS NULL +UNION ALL +SELECT + nttf.transaction_id, + nttf.transaction_id, + (SELECT id FROM bundle_formats WHERE format = 'ans-104') +FROM new_transaction_tags nttf +JOIN new_transaction_tags nttv ON nttv.transaction_id = nttf.transaction_id +LEFT JOIN bundles b ON b.id = nttf.transaction_id +WHERE nttf.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29' + AND nttf.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712' + AND nttv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216' + AND nttv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD' + AND b.id IS NULL +LIMIT 10000 +ON CONFLICT DO NOTHING diff --git a/src/database/sql/core/accessors.sql b/src/database/sql/core/accessors.sql index dc3267c2..c8e8dcab 100644 --- a/src/database/sql/core/accessors.sql +++ b/src/database/sql/core/accessors.sql @@ -23,5 +23,9 @@ LIMIT 1 -- selectMissingTransactionIds SELECT transaction_id -FROM missing_transactions -LIMIT @limit +FROM ( + SELECT transaction_id + FROM missing_transactions + LIMIT @limit +) +ORDER BY RANDOM() diff --git a/src/database/standalone-sqlite.test.ts b/src/database/standalone-sqlite.test.ts index 2bab92a5..2dd3dea6 100644 --- a/src/database/standalone-sqlite.test.ts +++ b/src/database/standalone-sqlite.test.ts @@ -53,7 +53,8 @@ const { default: processStream } = arbundles; const HEIGHT = 1138; const BLOCK_TX_INDEX = 42; const DATA_ITEM_ID = 'zoljIRyzG5hp-R4EZV2q8kFI49OAoy23_B9YJ_yEEws'; -const CURSOR = 'WzExMzgsNDIsInpvbGpJUnl6RzVocC1SNEVaVjJxOGtGSTQ5T0FveTIzX0I5WUpfeUVFd3MiXQ'; +const CURSOR = + 'WzExMzgsNDIsInpvbGpJUnl6RzVocC1SNEVaVjJxOGtGSTQ5T0FveTIzX0I5WUpfeUVFd3MiXQ'; describe('SQLite helper functions', () => { describe('toSqliteParams', () => { diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index 57421eed..b3adcc03 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -47,6 +47,8 @@ import { currentUnixTimestamp } from '../lib/time.js'; import log from '../log.js'; import { BlockListValidator, + BundleIndex, + BundleRecord, ChainIndex, ContiguousDataAttributes, ContiguousDataIndex, @@ -65,6 +67,7 @@ const MAX_WORKER_ERRORS = 100; const STABLE_FLUSH_INTERVAL = 5; const NEW_TX_CLEANUP_WAIT_SECS = 60 * 60 * 2; const NEW_DATA_ITEM_CLEANUP_WAIT_SECS = 60 * 60 * 2; +const BUNDLE_REPROCESS_WAIT_SECS = 60 * 60 * 4; const LOW_SELECTIVITY_TAG_NAMES = new Set(['App-Name', 'Content-Type']); function tagJoinSortPriority(tag: { name: string; values: string[] }) { @@ -329,8 +332,8 @@ export class StandaloneSqliteDatabaseWorker { moderation: { [stmtName: string]: Sqlite.Statement }; bundles: { [stmtName: string]: Sqlite.Statement }; }; - private bundleFormatIds: { [filter: string]: number; } = {}; - private filterIds: { [filter: string]: number; } = {}; + private bundleFormatIds: { [filter: string]: number } = {}; + private filterIds: { [filter: string]: number } = {}; // Transactions resetBundlesToHeightFn: Sqlite.Transaction; @@ -694,11 +697,30 @@ export class StandaloneSqliteDatabaseWorker { } getMissingTxIds(limit: number) { - const missingTxIds = this.stmts.core.selectMissingTransactionIds.all({ + const rows = this.stmts.core.selectMissingTransactionIds.all({ limit, }); - return missingTxIds.map((row): string => toB64Url(row.transaction_id)); + return rows.map((row): string => toB64Url(row.transaction_id)); + } + + getFailedBundleIds(limit: number) { + const rows = this.stmts.bundles.selectFailedBundleIds.all({ + limit, + reprocess_cutoff: currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS, + }); + + return rows.map((row): string => toB64Url(row.id)); + } + + backfillBundles() { + this.stmts.bundles.insertMissingBundles.run(); + } + + updateBundlesFullyIndexedAt() { + this.stmts.bundles.updateFullyIndexedAt.run({ + fully_indexed_at: currentUnixTimestamp(), + }); } resetToHeight(height: number) { @@ -720,7 +742,7 @@ export class StandaloneSqliteDatabaseWorker { if (format != undefined) { id = this.bundleFormatIds[format]; if (id == undefined) { - id= this.stmts.bundles.selectFormatId.get({ format })?.id; + id = this.stmts.bundles.selectFormatId.get({ format })?.id; if (id != undefined) { this.bundleFormatIds[format] = id; } @@ -735,7 +757,7 @@ export class StandaloneSqliteDatabaseWorker { id = this.filterIds[filter]; if (id == undefined) { this.stmts.bundles.insertOrIgnoreFilter.run({ filter }); - id= this.stmts.bundles.selectFilterId.get({ filter })?.id; + id = this.stmts.bundles.selectFilterId.get({ filter })?.id; if (id != undefined) { this.filterIds[filter] = id; } @@ -754,6 +776,7 @@ export class StandaloneSqliteDatabaseWorker { saveBundle({ id, + rootTransactionId, format, unbundleFilter, indexFilter, @@ -763,21 +786,15 @@ export class StandaloneSqliteDatabaseWorker { skippedAt, unbundledAt, fullyIndexedAt, - }: { - id: string; - format: 'ans-102' | 'ans-104'; - unbundleFilter?: string; - indexFilter?: string; - dataItemCount?: number; - matchedDataItemCount?: number; - queuedAt?: number; - skippedAt?: number; - unbundledAt?: number; - fullyIndexedAt?: number; - }) { + }: BundleRecord) { const idBuffer = fromB64Url(id); + let rootTxId: Buffer | undefined; + if (rootTransactionId != undefined) { + rootTxId = fromB64Url(rootTransactionId); + } this.stmts.bundles.upsertBundle.run({ id: idBuffer, + root_transaction_id: rootTxId, format_id: this.getBundleFormatId(format), unbundle_filter_id: this.getFilterId(unbundleFilter), index_filter_id: this.getFilterId(indexFilter), @@ -2041,6 +2058,7 @@ const WORKER_POOL_SIZES: WorkerPoolSizes = { export class StandaloneSqliteDatabase implements + BundleIndex, BlockListValidator, ChainIndex, ContiguousDataIndex, @@ -2246,10 +2264,22 @@ export class StandaloneSqliteDatabase return this.queueRead('core', 'getBlockHashByHeight', [height]); } - getMissingTxIds(limit = 20): Promise { + getMissingTxIds(limit: number): Promise { return this.queueRead('core', 'getMissingTxIds', [limit]); } + getFailedBundleIds(limit: number): Promise { + return this.queueRead('bundles', 'getFailedBundleIds', [limit]); + } + + backfillBundles() { + return this.queueRead('bundles', 'backfillBundles', undefined); + } + + updateBundlesFullyIndexedAt(): Promise { + return this.queueRead('bundles', 'updateBundlesFullyIndexedAt', undefined); + } + resetToHeight(height: number): Promise { return this.queueWrite('core', 'resetToHeight', [height]); } @@ -2262,17 +2292,7 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveDataItem', [item]); } - saveBundle(bundle: { - id: string; - format: 'ans-102' | 'ans-104'; - unbundleFilter?: string; - indexFilter?: string; - dataItemCount?: number; - matchedDataItemCount?: number; - queuedAt?: number; - skippedAt?: number; - unbundledAt?: number; - }): Promise { + saveBundle(bundle: BundleRecord): Promise { return this.queueWrite('bundles', 'saveBundle', [bundle]); } @@ -2476,8 +2496,19 @@ if (!isMainThread) { parentPort?.postMessage(newBlockHash); break; case 'getMissingTxIds': - const missingTxIdsRes = worker.getMissingTxIds(args[0]); - parentPort?.postMessage(missingTxIdsRes); + parentPort?.postMessage(worker.getMissingTxIds(args[0])); + break; + case 'getFailedBundleIds': + const failedBundleIds = worker.getFailedBundleIds(args[0]); + parentPort?.postMessage(failedBundleIds); + break; + case 'backfillBundles': + worker.backfillBundles(); + parentPort?.postMessage(null); + break; + case 'updateBundlesFullyIndexedAt': + worker.updateBundlesFullyIndexedAt(); + parentPort?.postMessage(null); break; case 'resetToHeight': worker.resetToHeight(args[0]); diff --git a/src/system.ts b/src/system.ts index 9fe483f3..22d817b2 100644 --- a/src/system.ts +++ b/src/system.ts @@ -41,6 +41,7 @@ import { FsDataStore } from './store/fs-data-store.js'; import { FsTransactionStore } from './store/fs-transaction-store.js'; import { BlockListValidator, + BundleIndex, ChainIndex, ContiguousDataIndex, DataItemIndexWriter, @@ -51,6 +52,7 @@ import { import { Ans104DataIndexer } from './workers/ans104-data-indexer.js'; import { Ans104Unbundler } from './workers/ans104-unbundler.js'; import { BlockImporter } from './workers/block-importer.js'; +import { BundleRepairWorker } from './workers/bundle-repair-worker.js'; import { DataItemIndexer } from './workers/data-item-indexer.js'; import { TransactionFetcher } from './workers/transaction-fetcher.js'; import { TransactionImporter } from './workers/transaction-importer.js'; @@ -105,6 +107,7 @@ export const db = new StandaloneSqliteDatabase({ }); export const chainIndex: ChainIndex = db; +export const bundleIndex: BundleIndex = db; export const contiguousDataIndex: ContiguousDataIndex = db; export const blockListValidator: BlockListValidator = db; export const nestedDataIndexWriter: NestedDataIndexWriter = db; @@ -167,6 +170,13 @@ export const txRepairWorker = new TransactionRepairWorker({ txFetcher, }); +export const bundleRepairWorker = new BundleRepairWorker({ + log, + bundleIndex, + txFetcher, + shouldBackfillBundles: config.BACKFILL_BUNDLE_RECORDS, +}); + // Configure contigous data source const chunkDataSource = new ReadThroughChunkDataCache({ log, @@ -208,6 +218,7 @@ eventEmitter.on( async (tx: PartialJsonTransaction) => { await db.saveBundle({ id: tx.id, + rootTransactionId: tx.id, format: 'ans-104', }); if (await config.ANS104_UNBUNDLE_FILTER.match(tx)) { diff --git a/src/types.d.ts b/src/types.d.ts index 5996ea67..de10449e 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -174,7 +174,7 @@ export interface ChainSource { export interface ChainIndex { getMaxHeight(): Promise; getBlockHashByHeight(height: number): Promise; - getMissingTxIds(limit?: number): Promise; + getMissingTxIds(limit: number): Promise; resetToHeight(height: number): Promise; saveTx(txs: PartialJsonTransaction): Promise; saveBlockAndTxs( @@ -184,6 +184,27 @@ export interface ChainIndex { ): Promise; } +export interface BundleRecord { + id: string; + rootTransactionId?: string; + format: 'ans-102' | 'ans-104'; + unbundleFilter?: string; + indexFilter?: string; + dataItemCount?: number; + matchedDataItemCount?: number; + queuedAt?: number; + skippedAt?: number; + unbundledAt?: number; + fullyIndexedAt?: number; +} + +export interface BundleIndex { + saveBundle(bundle: BundleRecord): Promise; + getFailedBundleIds(limit: number): Promise; + updateBundlesFullyIndexedAt(): Promise; + backfillBundles(): Promise; +} + export interface DataItemIndexWriter { saveDataItem(item: NormalizedDataItem): Promise; } diff --git a/src/workers/bundle-repair-worker.ts b/src/workers/bundle-repair-worker.ts new file mode 100644 index 00000000..3cefe291 --- /dev/null +++ b/src/workers/bundle-repair-worker.ts @@ -0,0 +1,99 @@ +/** + * AR.IO Gateway + * Copyright (C) 2023 Permanent Data Solutions, Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as winston from 'winston'; + +import { BundleIndex } from '../types.js'; +import { TransactionFetcher } from './transaction-fetcher.js'; + +const DEFAULT_RETRY_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes +const DEFAULT_UPDATE_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes +const DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS = 60 * 60 * 1000; // 1 hour +const DEFAULT_BUNDLES_TO_RETRY = 20; + +export class BundleRepairWorker { + // Dependencies + private log: winston.Logger; + private bundleIndex: BundleIndex; + private txFetcher: TransactionFetcher; + private shouldBackfillBundles: boolean; + + constructor({ + log, + bundleIndex, + txFetcher, + shouldBackfillBundles, + }: { + log: winston.Logger; + bundleIndex: BundleIndex; + txFetcher: TransactionFetcher; + shouldBackfillBundles: boolean; + }) { + this.log = log.child({ class: 'BundleRepairWorker' }); + this.bundleIndex = bundleIndex; + this.txFetcher = txFetcher; + this.shouldBackfillBundles = shouldBackfillBundles; + } + + async start(): Promise { + setInterval(this.retryBundles.bind(this), DEFAULT_RETRY_INTERVAL_MS); + setInterval( + this.updateBundleTimestamps.bind(this), + DEFAULT_UPDATE_INTERVAL_MS, + ); + if (this.shouldBackfillBundles) { + setInterval( + this.backfillBundles.bind(this), + DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS, + ); + } + } + + async retryBundles() { + try { + const bundleIds = await this.bundleIndex.getFailedBundleIds( + DEFAULT_BUNDLES_TO_RETRY, + ); + for (const bundleId of bundleIds) { + this.log.info('Retrying failed bundle', { bundleId }); + await this.txFetcher.queueTxId(bundleId); + } + } catch (error: any) { + this.log.error('Error retrying failed bundles:', error); + } + } + + async updateBundleTimestamps() { + try { + this.log.info('Updating bundle timestamps...'); + await this.bundleIndex.updateBundlesFullyIndexedAt(); + this.log.info('Bundle timestamps updated.'); + } catch (error: any) { + this.log.error('Error updating bundle timestamps:', error); + } + } + + async backfillBundles() { + try { + this.log.info('Backfilling bundle records...'); + await this.bundleIndex.backfillBundles(); + this.log.info('Bundle records backfilled.'); + } catch (error: any) { + this.log.error('Error backfilling bundle records:', error); + } + } +} diff --git a/src/workers/transaction-repair-worker.ts b/src/workers/transaction-repair-worker.ts index 7a53b2b1..7941ac11 100644 --- a/src/workers/transaction-repair-worker.ts +++ b/src/workers/transaction-repair-worker.ts @@ -21,6 +21,7 @@ import { ChainIndex } from '../types.js'; import { TransactionFetcher } from './transaction-fetcher.js'; const DEFAULT_INTERVAL_MS = 5 * 60 * 1000; +const DEFAULT_TXS_TO_RETRY = 20; export class TransactionRepairWorker { // Dependencies @@ -48,7 +49,9 @@ export class TransactionRepairWorker { async retryMissingTransactions() { try { - const missingTxIds = await this.chainIndex.getMissingTxIds(); + const missingTxIds = await this.chainIndex.getMissingTxIds( + DEFAULT_TXS_TO_RETRY, + ); for (const txId of missingTxIds) { this.log.info('Retrying missing transaction', { txId }); await this.txFetcher.queueTxId(txId); diff --git a/test/bundles-schema.sql b/test/bundles-schema.sql index 9a941a49..59d64ade 100644 --- a/test/bundles-schema.sql +++ b/test/bundles-schema.sql @@ -126,5 +126,15 @@ CREATE TABLE bundles ( last_unbundled_at INTEGER, first_fully_indexed_at INTEGER, last_fully_indexed_at INTEGER -); +, root_transaction_id BLOB); CREATE INDEX bundles_format_id_idx ON bundles (format_id); +CREATE INDEX bundles_last_queued_at_idx + ON bundles (last_queued_at); +CREATE INDEX bundles_last_skipped_at_idx + ON bundles (last_skipped_at); +CREATE INDEX bundles_last_fully_indexed_at_idx + ON bundles (last_fully_indexed_at); +CREATE INDEX bundles_matched_data_item_count_idx + ON bundles (matched_data_item_count); +CREATE INDEX bundle_data_items_parent_id_filter_id_idx + ON bundle_data_items (parent_id, filter_id);