Skip to content

Commit

Permalink
feat(bundles ans-104): cap unbundle queue + prioritize manually queue…
Browse files Browse the repository at this point in the history
…d TXs PE-4306

Caps the ANS-104 unbundle queue at 1000 by default to limit memory
consumption and prevent large backlogs from monopolizing the import
process when backfilling (skipped bundles will get retried later). Also
adds the ability to prioritize certain L1 TXs for unbundling and enables
prioritization for TXs that are queued via the admin interface.
  • Loading branch information
djwhitt committed Aug 7, 2023
1 parent bca8c7a commit e3551ea
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ app.post('/ar-io/admin/queue-tx', express.json(), async (req, res) => {
res.status(400).send("Must provide 'id'");
return;
}
system.prioritizedTxIds.add(id);
system.txFetcher.queueTxId(id);
res.json({ message: 'TX queued' });
} catch (error: any) {
Expand Down
21 changes: 14 additions & 7 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ const bundlesCounter = new promClient.Counter({
labelNames: ['bundle_format', 'parent_type'],
});

export const prioritizedTxIds = new Set<string>();

eventEmitter.on(events.TX_INDEXED, async (tx: MatchableItem) => {
if (await ans104TxMatcher.match(tx)) {
bundlesCounter.inc({
Expand Down Expand Up @@ -272,13 +274,18 @@ eventEmitter.on(
indexFilter: config.ANS104_INDEX_FILTER_STRING,
queuedAt: currentUnixTimestamp(),
});
ans104Unbundler.queueItem({
index:
'parent_index' in item && item.parent_index !== undefined
? item.parent_index
: -1, // parent indexes are not needed for L1
...item,
});
const prioritized = prioritizedTxIds.has(item.id);
prioritizedTxIds.delete(item.id);
ans104Unbundler.queueItem(
{
index:
'parent_index' in item && item.parent_index !== undefined
? item.parent_index
: -1, // parent indexes are not needed for L1
...item,
},
prioritized,
);
bundlesQueuedCounter.inc({ bundle_format: 'ans-104' });
} else {
await db.saveBundle({
Expand Down
24 changes: 20 additions & 4 deletions src/workers/ans104-unbundler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from '../types.js';

const DEFAULT_WORKER_COUNT = 1;
const DEFAULT_MAX_QUEUE_SIZE = 1000;

interface IndexProperty {
index: number;
Expand All @@ -43,6 +44,7 @@ export class Ans104Unbundler {
private filter: ItemFilter;

// Unbundling queue
private maxQueueSize: number;
private queue: queueAsPromised<UnbundleableItem, void>;

// Parser
Expand All @@ -54,13 +56,15 @@ export class Ans104Unbundler {
filter,
contiguousDataSource,
dataItemIndexFilterString,
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
workerCount = DEFAULT_WORKER_COUNT,
}: {
log: winston.Logger;
eventEmitter: EventEmitter;
filter: ItemFilter;
contiguousDataSource: ContiguousDataSource;
dataItemIndexFilterString: string;
maxQueueSize?: number;
workerCount?: number;
}) {
this.log = log.child({ class: 'Ans104Unbundler' });
Expand All @@ -72,14 +76,26 @@ export class Ans104Unbundler {
dataItemIndexFilterString,
});

this.maxQueueSize = maxQueueSize;
this.queue = fastq.promise(this.unbundle.bind(this), workerCount);
}

async queueItem(item: UnbundleableItem): Promise<void> {
async queueItem(
item: UnbundleableItem,
prioritized: boolean | undefined,
): Promise<void> {
const log = this.log.child({ method: 'queueItem', id: item.id });
log.debug('Queueing bundle...');
this.queue.push(item);
log.debug('Bundle queued.');
if (prioritized === true) {
log.debug('Queueing prioritized bundle...');
this.queue.unshift(item);
log.debug('Prioritized bundle queued.');
} else if (this.queue.length() < this.maxQueueSize) {
log.debug('Queueing bundle...');
this.queue.push(item);
log.debug('Bundle queued.');
} else {
log.info('Skipping unbundle, queue is full.');
}
}

async unbundle(item: UnbundleableItem): Promise<void> {
Expand Down

0 comments on commit e3551ea

Please sign in to comment.