Skip to content

Commit

Permalink
feat(bundles repair): add bundle repair worker PE-4041
Browse files Browse the repository at this point in the history
Adds a bundle repair worker that queries `bundles` and
`bundle_data_item` tables to determine which bundles have been fully
imported. It does this by setting bundle `last_fully_indexed_at` based
on a comparison of `bundle_data_items` for each bundle to
`matched_data_item_count` on the bundles (taking filters into account)
and then using those `last_fully_indexed_at` timestamps to determine if
the bundle should be reprocessed.
  • Loading branch information
djwhitt committed Jul 10, 2023
1 parent 2294c8e commit bd70578
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
# Build and push container image to ECR
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1-node16
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_BUILD_INVOCATION_ROLE }}
aws-region: ${{ secrets.AWS_REGION }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE bundles ADD COLUMN root_transaction_id BLOB;
UPDATE bundles SET root_transaction_id = id;
15 changes: 15 additions & 0 deletions migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE INDEX IF NOT EXISTS bundles_last_queued_at_idx
ON bundles (last_queued_at);
CREATE INDEX IF NOT EXISTS bundles_last_skipped_at_idx
ON bundles (last_skipped_at);
CREATE INDEX IF NOT EXISTS bundles_last_fully_indexed_at_idx
ON bundles (last_fully_indexed_at);
CREATE INDEX IF NOT EXISTS bundles_matched_data_item_count_idx
ON bundles (matched_data_item_count);
CREATE INDEX IF NOT EXISTS bundles_unbundle_filter_id_idx
ON bundles (unbundle_filter_id);
CREATE INDEX IF NOT EXISTS bundles_index_filter_id_idx
ON bundles (index_filter_id);

CREATE INDEX IF NOT EXISTS bundle_data_items_parent_id_filter_id_idx
ON bundle_data_items (parent_id, filter_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE bundles DROP COLUMN root_transaction_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
DROP INDEX IF EXISTS bundle_data_items_parent_id_filter_id_idx;

DROP INDEX IF EXISTS bundles_unbundle_filter_id_idx;
DROP INDEX IF EXISTS bundles_index_filter_id_idx;
DROP INDEX IF EXISTS bundles_matched_data_item_count_idx;
DROP INDEX IF EXISTS bundles_last_fully_indexed_at_idx;
DROP INDEX IF EXISTS bundles_last_skipped_at_idx;
DROP INDEX IF EXISTS bundles_last_queued_at_idx;
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import * as system from './system.js';
system.arweaveClient.refreshPeers();
system.blockImporter.start();
system.txRepairWorker.start();
system.bundleRepairWorker.start();

// HTTP server
const app = express();
Expand Down
4 changes: 2 additions & 2 deletions src/database/sql/bundles/import.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
-- upsertBundle
INSERT INTO bundles (
id, format_id,
id, root_transaction_id, format_id,
unbundle_filter_id, index_filter_id,
data_item_count, matched_data_item_count,
first_queued_at, last_queued_at,
first_skipped_at, last_skipped_at,
first_unbundled_at, last_unbundled_at,
first_fully_indexed_at, last_fully_indexed_at
) VALUES (
@id, @format_id,
@id, @root_transaction_id, @format_id,
@unbundle_filter_id, @index_filter_id,
@data_item_count, @matched_data_item_count,
@queued_at, @queued_at,
Expand Down
41 changes: 41 additions & 0 deletions src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- selectFailedBundleIds
SELECT DISTINCT id
FROM (
SELECT b.root_transaction_id AS id
FROM bundles b
WHERE (
(b.last_queued_at IS NULL AND b.last_skipped_at IS NULL)
OR (
b.last_queued_at IS NOT NULL
AND (
b.last_skipped_at IS NULL
OR b.last_skipped_at <= b.last_queued_at
)
AND b.last_queued_at < @reprocess_cutoff
)
)
AND b.last_fully_indexed_at IS NULL
AND (
b.matched_data_item_count IS NULL
OR b.matched_data_item_count > 0
)
ORDER BY b.last_queued_at ASC
LIMIT @limit
)
ORDER BY RANDOM()

-- updateFullyIndexedAt
UPDATE bundles
SET
first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at),
last_fully_indexed_at = @fully_indexed_at
WHERE matched_data_item_count IS NOT NULL
AND matched_data_item_count > 0
AND EXISTS (
SELECT 1
FROM bundle_data_items bdi
WHERE bdi.parent_id = bundles.id
AND bdi.filter_id = bundles.unbundle_filter_id
GROUP BY bdi.parent_id
HAVING COUNT(*) = bundles.matched_data_item_count
) AND last_fully_indexed_at IS NULL
8 changes: 6 additions & 2 deletions src/database/sql/core/accessors.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ LIMIT 1

-- selectMissingTransactionIds
SELECT transaction_id
FROM missing_transactions
LIMIT @limit
FROM (
SELECT transaction_id
FROM missing_transactions
LIMIT @limit
)
ORDER BY RANDOM()
83 changes: 51 additions & 32 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import { currentUnixTimestamp } from '../lib/time.js';
import log from '../log.js';
import {
BlockListValidator,
BundleIndex,
BundleRecord,
ChainIndex,
ContiguousDataAttributes,
ContiguousDataIndex,
Expand All @@ -65,6 +67,7 @@ const MAX_WORKER_ERRORS = 100;
const STABLE_FLUSH_INTERVAL = 5;
const NEW_TX_CLEANUP_WAIT_SECS = 60 * 60 * 2;
const NEW_DATA_ITEM_CLEANUP_WAIT_SECS = 60 * 60 * 2;
const BUNDLE_REPROCESS_WAIT_SECS = 60 * 60 * 4;
const LOW_SELECTIVITY_TAG_NAMES = new Set(['App-Name', 'Content-Type']);

function tagJoinSortPriority(tag: { name: string; values: string[] }) {
Expand Down Expand Up @@ -329,8 +332,8 @@ export class StandaloneSqliteDatabaseWorker {
moderation: { [stmtName: string]: Sqlite.Statement };
bundles: { [stmtName: string]: Sqlite.Statement };
};
private bundleFormatIds: { [filter: string]: number; } = {};
private filterIds: { [filter: string]: number; } = {};
private bundleFormatIds: { [filter: string]: number } = {};
private filterIds: { [filter: string]: number } = {};

// Transactions
resetBundlesToHeightFn: Sqlite.Transaction;
Expand Down Expand Up @@ -694,11 +697,26 @@ export class StandaloneSqliteDatabaseWorker {
}

getMissingTxIds(limit: number) {
const missingTxIds = this.stmts.core.selectMissingTransactionIds.all({
const rows = this.stmts.core.selectMissingTransactionIds.all({
limit,
});

return missingTxIds.map((row): string => toB64Url(row.transaction_id));
return rows.map((row): string => toB64Url(row.transaction_id));
}

getFailedBundleIds(limit: number) {
const rows = this.stmts.bundles.selectFailedBundleIds.all({
limit,
reprocess_cutoff: currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS,
});

return rows.map((row): string => toB64Url(row.id));
}

updateBundlesFullyIndexedAt() {
this.stmts.bundles.updateFullyIndexedAt.run({
fully_indexed_at: currentUnixTimestamp(),
});
}

resetToHeight(height: number) {
Expand All @@ -720,7 +738,7 @@ export class StandaloneSqliteDatabaseWorker {
if (format != undefined) {
id = this.bundleFormatIds[format];
if (id == undefined) {
id= this.stmts.bundles.selectFormatId.get({ format })?.id;
id = this.stmts.bundles.selectFormatId.get({ format })?.id;
if (id != undefined) {
this.bundleFormatIds[format] = id;
}
Expand All @@ -735,7 +753,7 @@ export class StandaloneSqliteDatabaseWorker {
id = this.filterIds[filter];
if (id == undefined) {
this.stmts.bundles.insertOrIgnoreFilter.run({ filter });
id= this.stmts.bundles.selectFilterId.get({ filter })?.id;
id = this.stmts.bundles.selectFilterId.get({ filter })?.id;
if (id != undefined) {
this.filterIds[filter] = id;
}
Expand All @@ -754,6 +772,7 @@ export class StandaloneSqliteDatabaseWorker {

saveBundle({
id,
rootTransactionId,
format,
unbundleFilter,
indexFilter,
Expand All @@ -763,21 +782,15 @@ export class StandaloneSqliteDatabaseWorker {
skippedAt,
unbundledAt,
fullyIndexedAt,
}: {
id: string;
format: 'ans-102' | 'ans-104';
unbundleFilter?: string;
indexFilter?: string;
dataItemCount?: number;
matchedDataItemCount?: number;
queuedAt?: number;
skippedAt?: number;
unbundledAt?: number;
fullyIndexedAt?: number;
}) {
}: BundleRecord) {
const idBuffer = fromB64Url(id);
let rootTxId: Buffer | undefined;
if (rootTransactionId != undefined) {
rootTxId = fromB64Url(rootTransactionId);
}
this.stmts.bundles.upsertBundle.run({
id: idBuffer,
root_transaction_id: rootTxId,
format_id: this.getBundleFormatId(format),
unbundle_filter_id: this.getFilterId(unbundleFilter),
index_filter_id: this.getFilterId(indexFilter),
Expand Down Expand Up @@ -2041,6 +2054,7 @@ const WORKER_POOL_SIZES: WorkerPoolSizes = {

export class StandaloneSqliteDatabase
implements
BundleIndex,
BlockListValidator,
ChainIndex,
ContiguousDataIndex,
Expand Down Expand Up @@ -2246,10 +2260,18 @@ export class StandaloneSqliteDatabase
return this.queueRead('core', 'getBlockHashByHeight', [height]);
}

getMissingTxIds(limit = 20): Promise<string[]> {
getMissingTxIds(limit: number): Promise<string[]> {
return this.queueRead('core', 'getMissingTxIds', [limit]);
}

getFailedBundleIds(limit: number): Promise<string[]> {
return this.queueRead('bundles', 'getFailedBundleIds', [limit]);
}

updateBundlesFullyIndexedAt(): Promise<void> {
return this.queueRead('bundles', 'updateBundlesFullyIndexedAt', undefined);
}

resetToHeight(height: number): Promise<void> {
return this.queueWrite('core', 'resetToHeight', [height]);
}
Expand All @@ -2262,17 +2284,7 @@ export class StandaloneSqliteDatabase
return this.queueWrite('bundles', 'saveDataItem', [item]);
}

saveBundle(bundle: {
id: string;
format: 'ans-102' | 'ans-104';
unbundleFilter?: string;
indexFilter?: string;
dataItemCount?: number;
matchedDataItemCount?: number;
queuedAt?: number;
skippedAt?: number;
unbundledAt?: number;
}): Promise<void> {
saveBundle(bundle: BundleRecord): Promise<void> {
return this.queueWrite('bundles', 'saveBundle', [bundle]);
}

Expand Down Expand Up @@ -2476,8 +2488,15 @@ if (!isMainThread) {
parentPort?.postMessage(newBlockHash);
break;
case 'getMissingTxIds':
const missingTxIdsRes = worker.getMissingTxIds(args[0]);
parentPort?.postMessage(missingTxIdsRes);
parentPort?.postMessage(worker.getMissingTxIds(args[0]));
break;
case 'getFailedBundleIds':
const failedBundleIds = worker.getFailedBundleIds(args[0]);
parentPort?.postMessage(failedBundleIds);
break;
case 'updateBundlesFullyIndexedAt':
worker.updateBundlesFullyIndexedAt();
parentPort?.postMessage(null);
break;
case 'resetToHeight':
worker.resetToHeight(args[0]);
Expand Down
10 changes: 10 additions & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { FsDataStore } from './store/fs-data-store.js';
import { FsTransactionStore } from './store/fs-transaction-store.js';
import {
BlockListValidator,
BundleIndex,
ChainIndex,
ContiguousDataIndex,
DataItemIndexWriter,
Expand All @@ -51,6 +52,7 @@ import {
import { Ans104DataIndexer } from './workers/ans104-data-indexer.js';
import { Ans104Unbundler } from './workers/ans104-unbundler.js';
import { BlockImporter } from './workers/block-importer.js';
import { BundleRepairWorker } from './workers/bundle-repair-worker.js';
import { DataItemIndexer } from './workers/data-item-indexer.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
import { TransactionImporter } from './workers/transaction-importer.js';
Expand Down Expand Up @@ -105,6 +107,7 @@ export const db = new StandaloneSqliteDatabase({
});

export const chainIndex: ChainIndex = db;
export const bundleIndex: BundleIndex = db;
export const contiguousDataIndex: ContiguousDataIndex = db;
export const blockListValidator: BlockListValidator = db;
export const nestedDataIndexWriter: NestedDataIndexWriter = db;
Expand Down Expand Up @@ -167,6 +170,12 @@ export const txRepairWorker = new TransactionRepairWorker({
txFetcher,
});

export const bundleRepairWorker = new BundleRepairWorker({
log,
bundleIndex,
txFetcher,
});

// Configure contigous data source
const chunkDataSource = new ReadThroughChunkDataCache({
log,
Expand Down Expand Up @@ -208,6 +217,7 @@ eventEmitter.on(
async (tx: PartialJsonTransaction) => {
await db.saveBundle({
id: tx.id,
rootTransactionId: tx.id,
format: 'ans-104',
});
if (await config.ANS104_UNBUNDLE_FILTER.match(tx)) {
Expand Down
22 changes: 21 additions & 1 deletion src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export interface ChainSource {
export interface ChainIndex {
getMaxHeight(): Promise<number>;
getBlockHashByHeight(height: number): Promise<string | undefined>;
getMissingTxIds(limit?: number): Promise<string[]>;
getMissingTxIds(limit: number): Promise<string[]>;
resetToHeight(height: number): Promise<void>;
saveTx(txs: PartialJsonTransaction): Promise<void>;
saveBlockAndTxs(
Expand All @@ -184,6 +184,26 @@ export interface ChainIndex {
): Promise<void>;
}

export interface BundleRecord {
id: string;
rootTransactionId?: string;
format: 'ans-102' | 'ans-104';
unbundleFilter?: string;
indexFilter?: string;
dataItemCount?: number;
matchedDataItemCount?: number;
queuedAt?: number;
skippedAt?: number;
unbundledAt?: number;
fullyIndexedAt?: number;
}

export interface BundleIndex {
saveBundle(bundle: BundleRecord): Promise<void>;
getFailedBundleIds(limit: number): Promise<string[]>;
updateBundlesFullyIndexedAt(): Promise<void>;
}

export interface DataItemIndexWriter {
saveDataItem(item: NormalizedDataItem): Promise<void>;
}
Expand Down
Loading

0 comments on commit bd70578

Please sign in to comment.