diff --git a/src/system.ts b/src/system.ts index a4d5573c..e0dffe2d 100644 --- a/src/system.ts +++ b/src/system.ts @@ -248,37 +248,48 @@ const bundlesMatchedCounter = new promClient.Counter({ labelNames: ['bundle_format'], }); +const bundlesQueuedCounter = new promClient.Counter({ + name: 'bundles_queued_total', + help: 'Count of bundles queued for unbundling', + labelNames: ['bundle_format'], +}); + eventEmitter.on( events.ANS104_BUNDLE_INDEXED, async (item: NormalizedDataItem | PartialJsonTransaction) => { - await db.saveBundle({ - id: item.id, - rootTransactionId: 'root_tx_id' in item ? item.root_tx_id : item.id, - format: 'ans-104', - }); - if (await config.ANS104_UNBUNDLE_FILTER.match(item)) { - bundlesMatchedCounter.inc({ bundle_format: 'ans-104' }); + try { await db.saveBundle({ id: item.id, + rootTransactionId: 'root_tx_id' in item ? item.root_tx_id : item.id, format: 'ans-104', - unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING, - indexFilter: config.ANS104_INDEX_FILTER_STRING, - queuedAt: currentUnixTimestamp(), - }); - ans104Unbundler.queueItem({ - index: - 'parent_index' in item && item.parent_index !== undefined - ? item.parent_index - : -1, // parent indexes are not needed for L1 - ...item, - }); - } else { - await db.saveBundle({ - id: item.id, - format: 'ans-104', - unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING, - skippedAt: currentUnixTimestamp(), }); + if (await config.ANS104_UNBUNDLE_FILTER.match(item)) { + bundlesMatchedCounter.inc({ bundle_format: 'ans-104' }); + await db.saveBundle({ + id: item.id, + format: 'ans-104', + unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING, + indexFilter: config.ANS104_INDEX_FILTER_STRING, + queuedAt: currentUnixTimestamp(), + }); + ans104Unbundler.queueItem({ + index: + 'parent_index' in item && item.parent_index !== undefined + ? item.parent_index + : -1, // parent indexes are not needed for L1 + ...item, + }); + bundlesQueuedCounter.inc({ bundle_format: 'ans-104' }); + } else { + await db.saveBundle({ + id: item.id, + format: 'ans-104', + unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING, + skippedAt: currentUnixTimestamp(), + }); + } + } catch (error) { + log.error('Error saving or queueing bundle', error); } }, ); @@ -290,14 +301,18 @@ const bundlesUnbundledCounter = new promClient.Counter({ }); eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => { - bundlesUnbundledCounter.inc({ bundle_format: 'ans-104' }); - db.saveBundle({ - id: bundleEvent.parentId, - format: 'ans-104', - dataItemCount: bundleEvent.itemCount, - matchedDataItemCount: bundleEvent.matchedItemCount, - unbundledAt: currentUnixTimestamp(), - }); + try { + bundlesUnbundledCounter.inc({ bundle_format: 'ans-104' }); + db.saveBundle({ + id: bundleEvent.parentId, + format: 'ans-104', + dataItemCount: bundleEvent.itemCount, + matchedDataItemCount: bundleEvent.matchedItemCount, + unbundledAt: currentUnixTimestamp(), + }); + } catch (error) { + log.error('Error saving unbundle completion', error); + } }); const dataItemIndexer = new DataItemIndexer({