Skip to content

Commit

Permalink
feat(bundles): add a process to reindex bundles after a filter change…
Browse files Browse the repository at this point in the history
… PE-4115

Adds a process that resets bundle timestamps for bundles that were
processed with different filters than are currenly in use. Since the
process creates some DB load even if the filters are unchnaged, it is
only enabled when the FILTER_CHANGE_REPROCESS environment variable is
set to true. In the future we may optimize this further by keeping a log
of filter changes. That would enable more efficient queries based on
comparing timestamps (< filter change time) rather than filter IDs
(using an inequality).
  • Loading branch information
djwhitt committed Jul 11, 2023
1 parent 45e00fa commit c57184f
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 0 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ services:
- AR_IO_WALLET=${AR_IO_WALLET:-}
- ADMIN_API_KEY=${ADMIN_API_KEY:-}
- BACKFILL_BUNDLE_RECORDS=${BACKFILL_BUNDLE_RECORDS:-}
- FILTER_CHANGE_REPROCESS=${FILTER_CHANGE_REPROCESS:-}
- ANS104_UNBUNDLE_FILTER=${ANS104_UNBUNDLE_FILTER:-}
- ANS104_INDEX_FILTER=${ANS104_INDEX_FILTER:-}
- ARNS_ROOT_HOST=${ARNS_ROOT_HOST:-}
Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ if (env.varOrUndefined('ADMIN_API_KEY') === undefined) {
}
export const BACKFILL_BUNDLE_RECORDS =
env.varOrDefault('BACKFILL_BUNDLE_RECORDS', 'false') === 'true';
export const FILTER_CHANGE_REPROCESS =
env.varOrDefault('FILTER_CHANGE_REPROCESS', 'false') === 'true';
export const ANS104_UNBUNDLE_FILTER_STRING = canonicalize(
JSON.parse(env.varOrDefault('ANS104_UNBUNDLE_FILTER', '{"never": true}')),
);
Expand Down
26 changes: 26 additions & 0 deletions src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ WHERE matched_data_item_count IS NOT NULL
HAVING COUNT(*) = bundles.matched_data_item_count
) AND last_fully_indexed_at IS NULL

-- updateForFilterChange
UPDATE bundles
SET
last_queued_at = NULL,
last_skipped_at = NULL
WHERE id IN (
SELECT b.id
FROM bundles b
WHERE (
last_skipped_at IS NOT NULL
AND unbundle_filter_id != (
SELECT id
FROM filters
WHERE filter = @unbundle_filter
)
) OR (
last_queued_at IS NOT NULL
AND index_filter_id != (
SELECT id
FROM filters
WHERE filter = @index_filter
)
)
LIMIT 10000
)

--insertMissingBundles
INSERT INTO bundles (
id,
Expand Down
19 changes: 19 additions & 0 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,13 @@ export class StandaloneSqliteDatabaseWorker {
});
}

updateBundlesForFilterChange(unbundleFilter: string, indexFilter: string) {
this.stmts.bundles.updateForFilterChange.run({
unbundle_filter: unbundleFilter,
index_filter: indexFilter,
});
}

resetToHeight(height: number) {
this.resetBundlesToHeightFn(height);
this.resetCoreToHeightFn(height);
Expand Down Expand Up @@ -2280,6 +2287,13 @@ export class StandaloneSqliteDatabase
return this.queueRead('bundles', 'updateBundlesFullyIndexedAt', undefined);
}

updateBundlesForFilterChange(unbundleFilter: string, indexFilter: string) {
return this.queueWrite('bundles', 'updateBundlesForFilterChange', [
unbundleFilter,
indexFilter,
]);
}

resetToHeight(height: number): Promise<void> {
return this.queueWrite('core', 'resetToHeight', [height]);
}
Expand Down Expand Up @@ -2510,6 +2524,11 @@ if (!isMainThread) {
worker.updateBundlesFullyIndexedAt();
parentPort?.postMessage(null);
break;
case 'updateBundlesForFilterChange':
const [unbundleFilter, indexFilter] = args;
worker.updateBundlesForFilterChange(unbundleFilter, indexFilter);
parentPort?.postMessage(null);
break;
case 'resetToHeight':
worker.resetToHeight(args[0]);
parentPort?.postMessage(undefined);
Expand Down
3 changes: 3 additions & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ export const bundleRepairWorker = new BundleRepairWorker({
log,
bundleIndex,
txFetcher,
unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
indexFilter: config.ANS104_INDEX_FILTER_STRING,
shouldBackfillBundles: config.BACKFILL_BUNDLE_RECORDS,
filtersChanged: config.FILTER_CHANGE_REPROCESS,
});

// Configure contigous data source
Expand Down
4 changes: 4 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ export interface BundleIndex {
saveBundle(bundle: BundleRecord): Promise<void>;
getFailedBundleIds(limit: number): Promise<string[]>;
updateBundlesFullyIndexedAt(): Promise<void>;
updateBundlesForFilterChange(
unbundleFilter: string,
indexFilter: string,
): Promise<void>;
backfillBundles(): Promise<void>;
}

Expand Down
32 changes: 32 additions & 0 deletions src/workers/bundle-repair-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,43 @@ import { TransactionFetcher } from './transaction-fetcher.js';
const DEFAULT_RETRY_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes
const DEFAULT_UPDATE_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
const DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS = 15 * 60 * 1000; // 15 minutes
const DEFAULT_FILTER_REPOCESS_INTERVAL_MS = 15 * 60 * 1000; // 15 minutes
const DEFAULT_BUNDLES_TO_RETRY = 20;

export class BundleRepairWorker {
// Dependencies
private log: winston.Logger;
private bundleIndex: BundleIndex;
private txFetcher: TransactionFetcher;
private unbundledFilter: string;
private indexFilter: string;
private shouldBackfillBundles: boolean;
private filtersChanged: boolean;

constructor({
log,
bundleIndex,
txFetcher,
unbundleFilter,
indexFilter,
shouldBackfillBundles,
filtersChanged,
}: {
log: winston.Logger;
bundleIndex: BundleIndex;
txFetcher: TransactionFetcher;
unbundleFilter: string;
indexFilter: string;
shouldBackfillBundles: boolean;
filtersChanged: boolean;
}) {
this.log = log.child({ class: 'BundleRepairWorker' });
this.bundleIndex = bundleIndex;
this.txFetcher = txFetcher;
this.unbundledFilter = unbundleFilter;
this.indexFilter = indexFilter;
this.shouldBackfillBundles = shouldBackfillBundles;
this.filtersChanged = filtersChanged;
}

async start(): Promise<void> {
Expand All @@ -61,6 +74,12 @@ export class BundleRepairWorker {
DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS,
);
}
if (this.filtersChanged) {
setInterval(
this.updateForFilterChange.bind(this),
DEFAULT_FILTER_REPOCESS_INTERVAL_MS,
);
}
}

async retryBundles() {
Expand Down Expand Up @@ -96,4 +115,17 @@ export class BundleRepairWorker {
this.log.error('Error backfilling bundle records:', error);
}
}

async updateForFilterChange() {
try {
this.log.info('Update bundles for filter change...');
await this.bundleIndex.updateBundlesForFilterChange(
this.unbundledFilter,
this.indexFilter,
);
this.log.info('Bundles updated for filter change.');
} catch (error: any) {
this.log.error('Error updating bundles for filter change:', error);
}
}
}

0 comments on commit c57184f

Please sign in to comment.