From 30c1581de1bc2fdf32b390a7b9cffcb2c743a127 Mon Sep 17 00:00:00 2001 From: David Whittington Date: Tue, 27 Jun 2023 16:45:46 -0500 Subject: [PATCH] feat(bundles repair): add bundle repair worker PE-4041 Adds a bundle repair worker that queries `bundles` and `bundle_data_item` tables to determine which bundles have been fully imported. It does this by setting bundle `last_fully_indexed_at` based on a comparison of `bundle_data_items` for each bundle to `matched_data_item_count` on the bundles (taking filters into account) and then using those `last_fully_indexed_at` timestamps to determine if the bundle should be reprocessed. --- ...1.52.29.bundles.add-bundles-root-tx-id.sql | 2 + ...28T22.02.57.bundles.add-bundle-indexes.sql | 15 ++++ ...1.52.29.bundles.add-bundles-root-tx-id.sql | 1 + ...28T22.02.57.bundles.add-bundle-indexes.sql | 8 ++ src/app.ts | 1 + src/database/sql/bundles/import.sql | 4 +- src/database/sql/bundles/repair.sql | 41 +++++++++ src/database/sql/core/accessors.sql | 8 +- src/database/standalone-sqlite.ts | 83 ++++++++++++------- src/system.ts | 10 +++ src/types.d.ts | 22 ++++- src/workers/bundle-repair-worker.ts | 78 +++++++++++++++++ src/workers/transaction-repair-worker.ts | 5 +- 13 files changed, 240 insertions(+), 38 deletions(-) create mode 100644 migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql create mode 100644 migrations/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql create mode 100644 migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql create mode 100644 migrations/down/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql create mode 100644 src/database/sql/bundles/repair.sql create mode 100644 src/workers/bundle-repair-worker.ts diff --git a/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql b/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql new file mode 100644 index 00000000..2260f793 --- /dev/null +++ b/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql @@ -0,0 +1,2 @@ +ALTER TABLE bundles ADD COLUMN root_transaction_id BLOB; +UPDATE bundles SET root_transaction_id = id; diff --git a/migrations/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql b/migrations/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql new file mode 100644 index 00000000..7d0ded3e --- /dev/null +++ b/migrations/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql @@ -0,0 +1,15 @@ +CREATE INDEX IF NOT EXISTS bundles_last_queued_at_idx + ON bundles (last_queued_at); +CREATE INDEX IF NOT EXISTS bundles_last_skipped_at_idx + ON bundles (last_skipped_at); +CREATE INDEX IF NOT EXISTS bundles_last_fully_indexed_at_idx + ON bundles (last_fully_indexed_at); +CREATE INDEX IF NOT EXISTS bundles_matched_data_item_count_idx + ON bundles (matched_data_item_count); +CREATE INDEX IF NOT EXISTS bundles_unbundle_filter_id_idx + ON bundle (unbundle_filter_id); +CREATE INDEX IF NOT EXISTS bundles_index_filter_id_idx + ON bundle (index_filter_id); + +CREATE INDEX IF NOT EXISTS bundle_data_items_parent_id_filter_id_idx + ON bundle_data_items (parent_id, filter_id); diff --git a/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql b/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql new file mode 100644 index 00000000..47e5cd42 --- /dev/null +++ b/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql @@ -0,0 +1 @@ +ALTER TABLE bundles DROP COLUMN root_transaction_id; diff --git a/migrations/down/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql b/migrations/down/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql new file mode 100644 index 00000000..20a26d11 --- /dev/null +++ b/migrations/down/2023.06.28T22.02.57.bundles.add-bundle-indexes.sql @@ -0,0 +1,8 @@ +DROP INDEX IF EXISTS bundle_data_items_parent_id_filter_id_idx; + +DROP INDEX IF EXISTS bundles_unbundle_filter_id_idx; +DROP INDEX IF EXISTS bundles_index_filter_id_idx; +DROP INDEX IF EXISTS bundles_matched_data_item_count_idx; +DROP INDEX IF EXISTS bundles_last_fully_indexed_at_idx; +DROP INDEX IF EXISTS bundles_last_skipped_at_idx; +DROP INDEX IF EXISTS bundles_last_queued_at_idx; diff --git a/src/app.ts b/src/app.ts index bdffaf99..78f9b3ce 100644 --- a/src/app.ts +++ b/src/app.ts @@ -39,6 +39,7 @@ import * as system from './system.js'; system.arweaveClient.refreshPeers(); system.blockImporter.start(); system.txRepairWorker.start(); +system.bundleRepairWorker.start(); // HTTP server const app = express(); diff --git a/src/database/sql/bundles/import.sql b/src/database/sql/bundles/import.sql index 803b66a1..29fa4cf5 100644 --- a/src/database/sql/bundles/import.sql +++ b/src/database/sql/bundles/import.sql @@ -1,6 +1,6 @@ -- upsertBundle INSERT INTO bundles ( - id, format_id, + id, root_transaction_id, format_id, unbundle_filter_id, index_filter_id, data_item_count, matched_data_item_count, first_queued_at, last_queued_at, @@ -8,7 +8,7 @@ INSERT INTO bundles ( first_unbundled_at, last_unbundled_at, first_fully_indexed_at, last_fully_indexed_at ) VALUES ( - @id, @format_id, + @id, @root_transaction_id, @format_id, @unbundle_filter_id, @index_filter_id, @data_item_count, @matched_data_item_count, @queued_at, @queued_at, diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql new file mode 100644 index 00000000..231e9153 --- /dev/null +++ b/src/database/sql/bundles/repair.sql @@ -0,0 +1,41 @@ +-- selectFailedBundleIds +SELECT DISTINCT id +FROM ( + SELECT b.root_transaction_id AS id + FROM bundles b + WHERE ( + (b.last_queued_at IS NULL AND b.last_skipped_at IS NULL) + OR ( + b.last_queued_at IS NOT NULL + AND ( + b.last_skipped_at IS NULL + OR b.last_skipped_at <= b.last_queued_at + ) + AND b.last_queued_at < @reprocess_cutoff + ) + ) + AND b.last_fully_indexed_at IS NULL + AND ( + b.matched_data_item_count IS NULL + OR b.matched_data_item_count > 0 + ) + ORDER BY b.last_queued_at ASC + LIMIT @limit +) +ORDER BY RANDOM() + +-- updateFullyIndexedAt +UPDATE bundles +SET + first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at), + last_fully_indexed_at = @fully_indexed_at +WHERE matched_data_item_count IS NOT NULL + AND matched_data_item_count > 0 + AND EXISTS ( + SELECT 1 + FROM bundle_data_items bdi + WHERE bdi.parent_id = bundles.id + AND bdi.filter_id = bundles.unbundle_filter_id + GROUP BY bdi.parent_id + HAVING COUNT(*) = bundles.matched_data_item_count + ) AND last_fully_indexed_at IS NULL diff --git a/src/database/sql/core/accessors.sql b/src/database/sql/core/accessors.sql index dc3267c2..c8e8dcab 100644 --- a/src/database/sql/core/accessors.sql +++ b/src/database/sql/core/accessors.sql @@ -23,5 +23,9 @@ LIMIT 1 -- selectMissingTransactionIds SELECT transaction_id -FROM missing_transactions -LIMIT @limit +FROM ( + SELECT transaction_id + FROM missing_transactions + LIMIT @limit +) +ORDER BY RANDOM() diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index 57421eed..281f0725 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -47,6 +47,8 @@ import { currentUnixTimestamp } from '../lib/time.js'; import log from '../log.js'; import { BlockListValidator, + BundleIndex, + BundleRecord, ChainIndex, ContiguousDataAttributes, ContiguousDataIndex, @@ -65,6 +67,7 @@ const MAX_WORKER_ERRORS = 100; const STABLE_FLUSH_INTERVAL = 5; const NEW_TX_CLEANUP_WAIT_SECS = 60 * 60 * 2; const NEW_DATA_ITEM_CLEANUP_WAIT_SECS = 60 * 60 * 2; +const BUNDLE_REPROCESS_WAIT_SECS = 60 * 60 * 4; const LOW_SELECTIVITY_TAG_NAMES = new Set(['App-Name', 'Content-Type']); function tagJoinSortPriority(tag: { name: string; values: string[] }) { @@ -329,8 +332,8 @@ export class StandaloneSqliteDatabaseWorker { moderation: { [stmtName: string]: Sqlite.Statement }; bundles: { [stmtName: string]: Sqlite.Statement }; }; - private bundleFormatIds: { [filter: string]: number; } = {}; - private filterIds: { [filter: string]: number; } = {}; + private bundleFormatIds: { [filter: string]: number } = {}; + private filterIds: { [filter: string]: number } = {}; // Transactions resetBundlesToHeightFn: Sqlite.Transaction; @@ -694,11 +697,26 @@ export class StandaloneSqliteDatabaseWorker { } getMissingTxIds(limit: number) { - const missingTxIds = this.stmts.core.selectMissingTransactionIds.all({ + const rows = this.stmts.core.selectMissingTransactionIds.all({ limit, }); - return missingTxIds.map((row): string => toB64Url(row.transaction_id)); + return rows.map((row): string => toB64Url(row.transaction_id)); + } + + getFailedBundleIds(limit: number) { + const rows = this.stmts.bundles.selectFailedBundleIds.all({ + limit, + reprocess_cutoff: currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS, + }); + + return rows.map((row): string => toB64Url(row.id)); + } + + updateBundlesFullyIndexedAt() { + this.stmts.bundles.updateFullyIndexedAt.run({ + fully_indexed_at: currentUnixTimestamp(), + }); } resetToHeight(height: number) { @@ -720,7 +738,7 @@ export class StandaloneSqliteDatabaseWorker { if (format != undefined) { id = this.bundleFormatIds[format]; if (id == undefined) { - id= this.stmts.bundles.selectFormatId.get({ format })?.id; + id = this.stmts.bundles.selectFormatId.get({ format })?.id; if (id != undefined) { this.bundleFormatIds[format] = id; } @@ -735,7 +753,7 @@ export class StandaloneSqliteDatabaseWorker { id = this.filterIds[filter]; if (id == undefined) { this.stmts.bundles.insertOrIgnoreFilter.run({ filter }); - id= this.stmts.bundles.selectFilterId.get({ filter })?.id; + id = this.stmts.bundles.selectFilterId.get({ filter })?.id; if (id != undefined) { this.filterIds[filter] = id; } @@ -754,6 +772,7 @@ export class StandaloneSqliteDatabaseWorker { saveBundle({ id, + rootTransactionId, format, unbundleFilter, indexFilter, @@ -763,21 +782,15 @@ export class StandaloneSqliteDatabaseWorker { skippedAt, unbundledAt, fullyIndexedAt, - }: { - id: string; - format: 'ans-102' | 'ans-104'; - unbundleFilter?: string; - indexFilter?: string; - dataItemCount?: number; - matchedDataItemCount?: number; - queuedAt?: number; - skippedAt?: number; - unbundledAt?: number; - fullyIndexedAt?: number; - }) { + }: BundleRecord) { const idBuffer = fromB64Url(id); + let rootTxId: Buffer | undefined; + if (rootTransactionId != undefined) { + rootTxId = fromB64Url(rootTransactionId); + } this.stmts.bundles.upsertBundle.run({ id: idBuffer, + root_transaction_id: rootTxId, format_id: this.getBundleFormatId(format), unbundle_filter_id: this.getFilterId(unbundleFilter), index_filter_id: this.getFilterId(indexFilter), @@ -2041,6 +2054,7 @@ const WORKER_POOL_SIZES: WorkerPoolSizes = { export class StandaloneSqliteDatabase implements + BundleIndex, BlockListValidator, ChainIndex, ContiguousDataIndex, @@ -2246,10 +2260,18 @@ export class StandaloneSqliteDatabase return this.queueRead('core', 'getBlockHashByHeight', [height]); } - getMissingTxIds(limit = 20): Promise { + getMissingTxIds(limit: number): Promise { return this.queueRead('core', 'getMissingTxIds', [limit]); } + getFailedBundleIds(limit: number): Promise { + return this.queueRead('bundles', 'getFailedBundleIds', [limit]); + } + + updateBundlesFullyIndexedAt(): Promise { + return this.queueRead('bundles', 'updateBundlesFullyIndexedAt', undefined); + } + resetToHeight(height: number): Promise { return this.queueWrite('core', 'resetToHeight', [height]); } @@ -2262,17 +2284,7 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveDataItem', [item]); } - saveBundle(bundle: { - id: string; - format: 'ans-102' | 'ans-104'; - unbundleFilter?: string; - indexFilter?: string; - dataItemCount?: number; - matchedDataItemCount?: number; - queuedAt?: number; - skippedAt?: number; - unbundledAt?: number; - }): Promise { + saveBundle(bundle: BundleRecord): Promise { return this.queueWrite('bundles', 'saveBundle', [bundle]); } @@ -2476,8 +2488,15 @@ if (!isMainThread) { parentPort?.postMessage(newBlockHash); break; case 'getMissingTxIds': - const missingTxIdsRes = worker.getMissingTxIds(args[0]); - parentPort?.postMessage(missingTxIdsRes); + parentPort?.postMessage(worker.getMissingTxIds(args[0])); + break; + case 'getFailedBundleIds': + const failedBundleIds = worker.getFailedBundleIds(args[0]); + parentPort?.postMessage(failedBundleIds); + break; + case 'updateBundlesFullyIndexedAt': + worker.updateBundlesFullyIndexedAt(); + parentPort?.postMessage(null); break; case 'resetToHeight': worker.resetToHeight(args[0]); diff --git a/src/system.ts b/src/system.ts index 9fe483f3..71ed3dde 100644 --- a/src/system.ts +++ b/src/system.ts @@ -41,6 +41,7 @@ import { FsDataStore } from './store/fs-data-store.js'; import { FsTransactionStore } from './store/fs-transaction-store.js'; import { BlockListValidator, + BundleIndex, ChainIndex, ContiguousDataIndex, DataItemIndexWriter, @@ -51,6 +52,7 @@ import { import { Ans104DataIndexer } from './workers/ans104-data-indexer.js'; import { Ans104Unbundler } from './workers/ans104-unbundler.js'; import { BlockImporter } from './workers/block-importer.js'; +import { BundleRepairWorker } from './workers/bundle-repair-worker.js'; import { DataItemIndexer } from './workers/data-item-indexer.js'; import { TransactionFetcher } from './workers/transaction-fetcher.js'; import { TransactionImporter } from './workers/transaction-importer.js'; @@ -105,6 +107,7 @@ export const db = new StandaloneSqliteDatabase({ }); export const chainIndex: ChainIndex = db; +export const bundleIndex: BundleIndex = db; export const contiguousDataIndex: ContiguousDataIndex = db; export const blockListValidator: BlockListValidator = db; export const nestedDataIndexWriter: NestedDataIndexWriter = db; @@ -167,6 +170,12 @@ export const txRepairWorker = new TransactionRepairWorker({ txFetcher, }); +export const bundleRepairWorker = new BundleRepairWorker({ + log, + bundleIndex, + txFetcher, +}); + // Configure contigous data source const chunkDataSource = new ReadThroughChunkDataCache({ log, @@ -208,6 +217,7 @@ eventEmitter.on( async (tx: PartialJsonTransaction) => { await db.saveBundle({ id: tx.id, + rootTransactionId: tx.id, format: 'ans-104', }); if (await config.ANS104_UNBUNDLE_FILTER.match(tx)) { diff --git a/src/types.d.ts b/src/types.d.ts index 5996ea67..95b23eea 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -174,7 +174,7 @@ export interface ChainSource { export interface ChainIndex { getMaxHeight(): Promise; getBlockHashByHeight(height: number): Promise; - getMissingTxIds(limit?: number): Promise; + getMissingTxIds(limit: number): Promise; resetToHeight(height: number): Promise; saveTx(txs: PartialJsonTransaction): Promise; saveBlockAndTxs( @@ -184,6 +184,26 @@ export interface ChainIndex { ): Promise; } +export interface BundleRecord { + id: string; + rootTransactionId?: string; + format: 'ans-102' | 'ans-104'; + unbundleFilter?: string; + indexFilter?: string; + dataItemCount?: number; + matchedDataItemCount?: number; + queuedAt?: number; + skippedAt?: number; + unbundledAt?: number; + fullyIndexedAt?: number; +} + +export interface BundleIndex { + saveBundle(bundle: BundleRecord): Promise; + getFailedBundleIds(limit: number): Promise; + updateBundlesFullyIndexedAt(): Promise; +} + export interface DataItemIndexWriter { saveDataItem(item: NormalizedDataItem): Promise; } diff --git a/src/workers/bundle-repair-worker.ts b/src/workers/bundle-repair-worker.ts new file mode 100644 index 00000000..8d1e04f8 --- /dev/null +++ b/src/workers/bundle-repair-worker.ts @@ -0,0 +1,78 @@ +/** + * AR.IO Gateway + * Copyright (C) 2023 Permanent Data Solutions, Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as winston from 'winston'; + +import { BundleIndex } from '../types.js'; +import { TransactionFetcher } from './transaction-fetcher.js'; + +const DEFAULT_RETRY_INTERVAL_MS = 10 * 60 * 1000; +const DEFAULT_UPDATE_INTERVAL_MS = 5 * 60 * 1000; +const DEFAULT_BUNDLES_TO_RETRY = 20; + +export class BundleRepairWorker { + // Dependencies + private log: winston.Logger; + private bundleIndex: BundleIndex; + private txFetcher: TransactionFetcher; + + constructor({ + log, + bundleIndex, + txFetcher, + }: { + log: winston.Logger; + bundleIndex: BundleIndex; + txFetcher: TransactionFetcher; + }) { + this.log = log.child({ class: 'BundleRepairWorker' }); + this.bundleIndex = bundleIndex; + this.txFetcher = txFetcher; + } + + async start(): Promise { + setInterval(this.retryBundles.bind(this), DEFAULT_RETRY_INTERVAL_MS); + setInterval( + this.updateBundleTimestamps.bind(this), + DEFAULT_UPDATE_INTERVAL_MS, + ); + } + + async retryBundles() { + try { + const bundleIds = await this.bundleIndex.getFailedBundleIds( + DEFAULT_BUNDLES_TO_RETRY, + ); + for (const bundleId of bundleIds) { + this.log.info('Retrying failed bundle', { bundleId }); + await this.txFetcher.queueTxId(bundleId); + } + } catch (error: any) { + this.log.error('Error retrying failed bundles:', error); + } + } + + async updateBundleTimestamps() { + try { + this.log.debug('Updating bundle timestamps...'); + await this.bundleIndex.updateBundlesFullyIndexedAt(); + this.log.debug('Bundle timestamps updated.'); + } catch (error: any) { + this.log.error('Error updating bundle timestamps:', error); + } + } +} diff --git a/src/workers/transaction-repair-worker.ts b/src/workers/transaction-repair-worker.ts index 7a53b2b1..7941ac11 100644 --- a/src/workers/transaction-repair-worker.ts +++ b/src/workers/transaction-repair-worker.ts @@ -21,6 +21,7 @@ import { ChainIndex } from '../types.js'; import { TransactionFetcher } from './transaction-fetcher.js'; const DEFAULT_INTERVAL_MS = 5 * 60 * 1000; +const DEFAULT_TXS_TO_RETRY = 20; export class TransactionRepairWorker { // Dependencies @@ -48,7 +49,9 @@ export class TransactionRepairWorker { async retryMissingTransactions() { try { - const missingTxIds = await this.chainIndex.getMissingTxIds(); + const missingTxIds = await this.chainIndex.getMissingTxIds( + DEFAULT_TXS_TO_RETRY, + ); for (const txId of missingTxIds) { this.log.info('Retrying missing transaction', { txId }); await this.txFetcher.queueTxId(txId);