Skip to content

Commit

Permalink
feat(bundles repair): add bundle repair worker PE-4041
Browse files Browse the repository at this point in the history
Adds a bundle repair worker that queries `bundles` and
`bundle_data_item` tables to determine which bundles have been fully
imported. It does this by setting bundle `last_fully_indexed_at` based
on a comparison of `bundle_data_items` for each bundle to
`matched_data_item_count` on the bundles (taking filters into account)
and then using those `last_fully_indexed_at` timestamps to determine if
the bundle should be reprocessed.
  • Loading branch information
djwhitt committed Jul 11, 2023
1 parent 2294c8e commit a24491c
Show file tree
Hide file tree
Showing 18 changed files with 327 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
# Build and push container image to ECR
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1-node16
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_BUILD_INVOCATION_ROLE }}
aws-region: ${{ secrets.AWS_REGION }}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ services:
- INSTANCE_ID=${INSTANCE_ID:-}
- AR_IO_WALLET=${AR_IO_WALLET:-}
- ADMIN_API_KEY=${ADMIN_API_KEY:-}
- BACKFILL_BUNDLE_RECORDS=${BACKFILL_BUNDLE_RECORDS:-}
- ANS104_UNBUNDLE_FILTER=${ANS104_UNBUNDLE_FILTER:-}
- ANS104_INDEX_FILTER=${ANS104_INDEX_FILTER:-}
- ARNS_ROOT_HOST=${ARNS_ROOT_HOST:-}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE bundles ADD COLUMN root_transaction_id BLOB;
UPDATE bundles SET root_transaction_id = id;
15 changes: 15 additions & 0 deletions migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE INDEX IF NOT EXISTS bundles_last_queued_at_idx
ON bundles (last_queued_at);
CREATE INDEX IF NOT EXISTS bundles_last_skipped_at_idx
ON bundles (last_skipped_at);
CREATE INDEX IF NOT EXISTS bundles_last_fully_indexed_at_idx
ON bundles (last_fully_indexed_at);
CREATE INDEX IF NOT EXISTS bundles_matched_data_item_count_idx
ON bundles (matched_data_item_count);
CREATE INDEX IF NOT EXISTS bundles_unbundle_filter_id_idx
ON bundles (unbundle_filter_id);
CREATE INDEX IF NOT EXISTS bundles_index_filter_id_idx
ON bundles (index_filter_id);

CREATE INDEX IF NOT EXISTS bundle_data_items_parent_id_filter_id_idx
ON bundle_data_items (parent_id, filter_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE bundles DROP COLUMN root_transaction_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
DROP INDEX IF EXISTS bundle_data_items_parent_id_filter_id_idx;

DROP INDEX IF EXISTS bundles_unbundle_filter_id_idx;
DROP INDEX IF EXISTS bundles_index_filter_id_idx;
DROP INDEX IF EXISTS bundles_matched_data_item_count_idx;
DROP INDEX IF EXISTS bundles_last_fully_indexed_at_idx;
DROP INDEX IF EXISTS bundles_last_skipped_at_idx;
DROP INDEX IF EXISTS bundles_last_queued_at_idx;
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import * as system from './system.js';
system.arweaveClient.refreshPeers();
system.blockImporter.start();
system.txRepairWorker.start();
system.bundleRepairWorker.start();

// HTTP server
const app = express();
Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ export const ADMIN_API_KEY = env.varOrDefault(
if (env.varOrUndefined('ADMIN_API_KEY') === undefined) {
log.info('Using a random admin key since none was set', { ADMIN_API_KEY });
}
export const BACKFILL_BUNDLE_RECORDS =
env.varOrDefault('BACKFILL_BUNDLE_RECORDS', 'false') === 'true';
export const ANS104_UNBUNDLE_FILTER_STRING = canonicalize(
JSON.parse(env.varOrDefault('ANS104_UNBUNDLE_FILTER', '{"never": true}')),
);
Expand Down
4 changes: 2 additions & 2 deletions src/database/sql/bundles/import.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
-- upsertBundle
INSERT INTO bundles (
id, format_id,
id, root_transaction_id, format_id,
unbundle_filter_id, index_filter_id,
data_item_count, matched_data_item_count,
first_queued_at, last_queued_at,
first_skipped_at, last_skipped_at,
first_unbundled_at, last_unbundled_at,
first_fully_indexed_at, last_fully_indexed_at
) VALUES (
@id, @format_id,
@id, @root_transaction_id, @format_id,
@unbundle_filter_id, @index_filter_id,
@data_item_count, @matched_data_item_count,
@queued_at, @queued_at,
Expand Down
76 changes: 76 additions & 0 deletions src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
-- selectFailedBundleIds
SELECT DISTINCT id
FROM (
SELECT b.root_transaction_id AS id
FROM bundles b
WHERE (
(b.last_queued_at IS NULL AND b.last_skipped_at IS NULL)
OR (
b.last_queued_at IS NOT NULL
AND (
b.last_skipped_at IS NULL
OR b.last_skipped_at <= b.last_queued_at
)
AND b.last_queued_at < @reprocess_cutoff
)
)
AND b.last_fully_indexed_at IS NULL
AND (
b.matched_data_item_count IS NULL
OR b.matched_data_item_count > 0
)
ORDER BY b.last_queued_at ASC
LIMIT @limit
)
ORDER BY RANDOM()

-- updateFullyIndexedAt
UPDATE bundles
SET
first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at),
last_fully_indexed_at = @fully_indexed_at
WHERE matched_data_item_count IS NOT NULL
AND matched_data_item_count > 0
AND EXISTS (
SELECT 1
FROM bundle_data_items bdi
WHERE bdi.parent_id = bundles.id
AND bdi.filter_id = bundles.unbundle_filter_id
GROUP BY bdi.parent_id
HAVING COUNT(*) = bundles.matched_data_item_count
) AND last_fully_indexed_at IS NULL

--insertMissingBundles
INSERT INTO bundles (
id,
root_transaction_id,
format_id
)
SELECT
sttf.transaction_id,
sttf.transaction_id,
(SELECT id FROM bundle_formats WHERE format = 'ans-104')
FROM stable_transaction_tags sttf
JOIN stable_transaction_tags sttv ON sttv.transaction_id = sttf.transaction_id
AND sttv.transaction_tag_index != sttf.transaction_tag_index
LEFT JOIN bundles b ON b.id = sttf.transaction_id
WHERE sttf.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29'
AND sttf.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712'
AND sttv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216'
AND sttv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD'
AND b.id IS NULL
UNION ALL
SELECT
nttf.transaction_id,
nttf.transaction_id,
(SELECT id FROM bundle_formats WHERE format = 'ans-104')
FROM new_transaction_tags nttf
JOIN new_transaction_tags nttv ON nttv.transaction_id = nttf.transaction_id
LEFT JOIN bundles b ON b.id = nttf.transaction_id
WHERE nttf.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29'
AND nttf.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712'
AND nttv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216'
AND nttv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD'
AND b.id IS NULL
LIMIT 10000
ON CONFLICT DO NOTHING
8 changes: 6 additions & 2 deletions src/database/sql/core/accessors.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ LIMIT 1

-- selectMissingTransactionIds
SELECT transaction_id
FROM missing_transactions
LIMIT @limit
FROM (
SELECT transaction_id
FROM missing_transactions
LIMIT @limit
)
ORDER BY RANDOM()
3 changes: 2 additions & 1 deletion src/database/standalone-sqlite.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ const { default: processStream } = arbundles;
const HEIGHT = 1138;
const BLOCK_TX_INDEX = 42;
const DATA_ITEM_ID = 'zoljIRyzG5hp-R4EZV2q8kFI49OAoy23_B9YJ_yEEws';
const CURSOR = 'WzExMzgsNDIsInpvbGpJUnl6RzVocC1SNEVaVjJxOGtGSTQ5T0FveTIzX0I5WUpfeUVFd3MiXQ';
const CURSOR =
'WzExMzgsNDIsInpvbGpJUnl6RzVocC1SNEVaVjJxOGtGSTQ5T0FveTIzX0I5WUpfeUVFd3MiXQ';

describe('SQLite helper functions', () => {
describe('toSqliteParams', () => {
Expand Down
95 changes: 63 additions & 32 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import { currentUnixTimestamp } from '../lib/time.js';
import log from '../log.js';
import {
BlockListValidator,
BundleIndex,
BundleRecord,
ChainIndex,
ContiguousDataAttributes,
ContiguousDataIndex,
Expand All @@ -65,6 +67,7 @@ const MAX_WORKER_ERRORS = 100;
const STABLE_FLUSH_INTERVAL = 5;
const NEW_TX_CLEANUP_WAIT_SECS = 60 * 60 * 2;
const NEW_DATA_ITEM_CLEANUP_WAIT_SECS = 60 * 60 * 2;
const BUNDLE_REPROCESS_WAIT_SECS = 60 * 60 * 4;
const LOW_SELECTIVITY_TAG_NAMES = new Set(['App-Name', 'Content-Type']);

function tagJoinSortPriority(tag: { name: string; values: string[] }) {
Expand Down Expand Up @@ -329,8 +332,8 @@ export class StandaloneSqliteDatabaseWorker {
moderation: { [stmtName: string]: Sqlite.Statement };
bundles: { [stmtName: string]: Sqlite.Statement };
};
private bundleFormatIds: { [filter: string]: number; } = {};
private filterIds: { [filter: string]: number; } = {};
private bundleFormatIds: { [filter: string]: number } = {};
private filterIds: { [filter: string]: number } = {};

// Transactions
resetBundlesToHeightFn: Sqlite.Transaction;
Expand Down Expand Up @@ -694,11 +697,30 @@ export class StandaloneSqliteDatabaseWorker {
}

getMissingTxIds(limit: number) {
const missingTxIds = this.stmts.core.selectMissingTransactionIds.all({
const rows = this.stmts.core.selectMissingTransactionIds.all({
limit,
});

return missingTxIds.map((row): string => toB64Url(row.transaction_id));
return rows.map((row): string => toB64Url(row.transaction_id));
}

getFailedBundleIds(limit: number) {
const rows = this.stmts.bundles.selectFailedBundleIds.all({
limit,
reprocess_cutoff: currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS,
});

return rows.map((row): string => toB64Url(row.id));
}

backfillBundles() {
this.stmts.bundles.insertMissingBundles.run();
}

updateBundlesFullyIndexedAt() {
this.stmts.bundles.updateFullyIndexedAt.run({
fully_indexed_at: currentUnixTimestamp(),
});
}

resetToHeight(height: number) {
Expand All @@ -720,7 +742,7 @@ export class StandaloneSqliteDatabaseWorker {
if (format != undefined) {
id = this.bundleFormatIds[format];
if (id == undefined) {
id= this.stmts.bundles.selectFormatId.get({ format })?.id;
id = this.stmts.bundles.selectFormatId.get({ format })?.id;
if (id != undefined) {
this.bundleFormatIds[format] = id;
}
Expand All @@ -735,7 +757,7 @@ export class StandaloneSqliteDatabaseWorker {
id = this.filterIds[filter];
if (id == undefined) {
this.stmts.bundles.insertOrIgnoreFilter.run({ filter });
id= this.stmts.bundles.selectFilterId.get({ filter })?.id;
id = this.stmts.bundles.selectFilterId.get({ filter })?.id;
if (id != undefined) {
this.filterIds[filter] = id;
}
Expand All @@ -754,6 +776,7 @@ export class StandaloneSqliteDatabaseWorker {

saveBundle({
id,
rootTransactionId,
format,
unbundleFilter,
indexFilter,
Expand All @@ -763,21 +786,15 @@ export class StandaloneSqliteDatabaseWorker {
skippedAt,
unbundledAt,
fullyIndexedAt,
}: {
id: string;
format: 'ans-102' | 'ans-104';
unbundleFilter?: string;
indexFilter?: string;
dataItemCount?: number;
matchedDataItemCount?: number;
queuedAt?: number;
skippedAt?: number;
unbundledAt?: number;
fullyIndexedAt?: number;
}) {
}: BundleRecord) {
const idBuffer = fromB64Url(id);
let rootTxId: Buffer | undefined;
if (rootTransactionId != undefined) {
rootTxId = fromB64Url(rootTransactionId);
}
this.stmts.bundles.upsertBundle.run({
id: idBuffer,
root_transaction_id: rootTxId,
format_id: this.getBundleFormatId(format),
unbundle_filter_id: this.getFilterId(unbundleFilter),
index_filter_id: this.getFilterId(indexFilter),
Expand Down Expand Up @@ -2041,6 +2058,7 @@ const WORKER_POOL_SIZES: WorkerPoolSizes = {

export class StandaloneSqliteDatabase
implements
BundleIndex,
BlockListValidator,
ChainIndex,
ContiguousDataIndex,
Expand Down Expand Up @@ -2246,10 +2264,22 @@ export class StandaloneSqliteDatabase
return this.queueRead('core', 'getBlockHashByHeight', [height]);
}

getMissingTxIds(limit = 20): Promise<string[]> {
getMissingTxIds(limit: number): Promise<string[]> {
return this.queueRead('core', 'getMissingTxIds', [limit]);
}

getFailedBundleIds(limit: number): Promise<string[]> {
return this.queueRead('bundles', 'getFailedBundleIds', [limit]);
}

backfillBundles() {
return this.queueRead('bundles', 'backfillBundles', undefined);
}

updateBundlesFullyIndexedAt(): Promise<void> {
return this.queueRead('bundles', 'updateBundlesFullyIndexedAt', undefined);
}

resetToHeight(height: number): Promise<void> {
return this.queueWrite('core', 'resetToHeight', [height]);
}
Expand All @@ -2262,17 +2292,7 @@ export class StandaloneSqliteDatabase
return this.queueWrite('bundles', 'saveDataItem', [item]);
}

saveBundle(bundle: {
id: string;
format: 'ans-102' | 'ans-104';
unbundleFilter?: string;
indexFilter?: string;
dataItemCount?: number;
matchedDataItemCount?: number;
queuedAt?: number;
skippedAt?: number;
unbundledAt?: number;
}): Promise<void> {
saveBundle(bundle: BundleRecord): Promise<void> {
return this.queueWrite('bundles', 'saveBundle', [bundle]);
}

Expand Down Expand Up @@ -2476,8 +2496,19 @@ if (!isMainThread) {
parentPort?.postMessage(newBlockHash);
break;
case 'getMissingTxIds':
const missingTxIdsRes = worker.getMissingTxIds(args[0]);
parentPort?.postMessage(missingTxIdsRes);
parentPort?.postMessage(worker.getMissingTxIds(args[0]));
break;
case 'getFailedBundleIds':
const failedBundleIds = worker.getFailedBundleIds(args[0]);
parentPort?.postMessage(failedBundleIds);
break;
case 'backfillBundles':
worker.backfillBundles();
parentPort?.postMessage(null);
break;
case 'updateBundlesFullyIndexedAt':
worker.updateBundlesFullyIndexedAt();
parentPort?.postMessage(null);
break;
case 'resetToHeight':
worker.resetToHeight(args[0]);
Expand Down
Loading

0 comments on commit a24491c

Please sign in to comment.