Skip to content

Commit

Permalink
feat(bundles ans-104): add unbundle Prometheus metric PE-4212
Browse files Browse the repository at this point in the history
Also adds exception handling for DB errors in bundle event handlers (to
improve error reporting).
  • Loading branch information
djwhitt committed Jul 25, 2023
1 parent b05ae8d commit d1b6301
Showing 1 changed file with 47 additions and 32 deletions.
79 changes: 47 additions & 32 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,37 +248,48 @@ const bundlesMatchedCounter = new promClient.Counter({
labelNames: ['bundle_format'],
});

const bundlesQueuedCounter = new promClient.Counter({
name: 'bundles_queued_total',
help: 'Count of bundles queued for unbundling',
labelNames: ['bundle_format'],
});

eventEmitter.on(
events.ANS104_BUNDLE_INDEXED,
async (item: NormalizedDataItem | PartialJsonTransaction) => {
await db.saveBundle({
id: item.id,
rootTransactionId: 'root_tx_id' in item ? item.root_tx_id : item.id,
format: 'ans-104',
});
if (await config.ANS104_UNBUNDLE_FILTER.match(item)) {
bundlesMatchedCounter.inc({ bundle_format: 'ans-104' });
try {
await db.saveBundle({
id: item.id,
rootTransactionId: 'root_tx_id' in item ? item.root_tx_id : item.id,
format: 'ans-104',
unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
indexFilter: config.ANS104_INDEX_FILTER_STRING,
queuedAt: currentUnixTimestamp(),
});
ans104Unbundler.queueItem({
index:
'parent_index' in item && item.parent_index !== undefined
? item.parent_index
: -1, // parent indexes are not needed for L1
...item,
});
} else {
await db.saveBundle({
id: item.id,
format: 'ans-104',
unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
skippedAt: currentUnixTimestamp(),
});
if (await config.ANS104_UNBUNDLE_FILTER.match(item)) {
bundlesMatchedCounter.inc({ bundle_format: 'ans-104' });
await db.saveBundle({
id: item.id,
format: 'ans-104',
unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
indexFilter: config.ANS104_INDEX_FILTER_STRING,
queuedAt: currentUnixTimestamp(),
});
ans104Unbundler.queueItem({
index:
'parent_index' in item && item.parent_index !== undefined
? item.parent_index
: -1, // parent indexes are not needed for L1
...item,
});
bundlesQueuedCounter.inc({ bundle_format: 'ans-104' });
} else {
await db.saveBundle({
id: item.id,
format: 'ans-104',
unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
skippedAt: currentUnixTimestamp(),
});
}
} catch (error) {
log.error('Error saving or queueing bundle', error);
}
},
);
Expand All @@ -290,14 +301,18 @@ const bundlesUnbundledCounter = new promClient.Counter({
});

eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => {
bundlesUnbundledCounter.inc({ bundle_format: 'ans-104' });
db.saveBundle({
id: bundleEvent.parentId,
format: 'ans-104',
dataItemCount: bundleEvent.itemCount,
matchedDataItemCount: bundleEvent.matchedItemCount,
unbundledAt: currentUnixTimestamp(),
});
try {
bundlesUnbundledCounter.inc({ bundle_format: 'ans-104' });
db.saveBundle({
id: bundleEvent.parentId,
format: 'ans-104',
dataItemCount: bundleEvent.itemCount,
matchedDataItemCount: bundleEvent.matchedItemCount,
unbundledAt: currentUnixTimestamp(),
});
} catch (error) {
log.error('Error saving unbundle completion', error);
}
});

const dataItemIndexer = new DataItemIndexer({
Expand Down

0 comments on commit d1b6301

Please sign in to comment.