diff --git a/src/metrics.ts b/src/metrics.ts index 2497225b..d4047446 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -53,3 +53,46 @@ export const methodDurationSummary = new promClient.Summary({ help: 'Count of failed Arweave peer info requests', labelNames: ['worker', 'role', 'method'], }); + +// +// Block importer metrics +// +export const blockImporterRunningGauge = new promClient.Gauge({ + name: 'block_importer_running', + help: 'Depth of the last observed chain fork', +}); + +export const forksCounter = new promClient.Counter({ + name: 'forks_total', + help: 'Count of chain forks observed', +}); + +export const lastForkDepthGauge = new promClient.Gauge({ + name: 'last_fork_depth', + help: 'Depth of the last observed chain fork', +}); + +export const blocksImportedCounter = new promClient.Counter({ + name: 'blocks_imported_total', + help: 'Count of blocks imported', +}); + +export const transactionsImportedCounter = new promClient.Counter({ + name: 'block_transactions_imported_total', + help: 'Count of transactions imported', +}); + +export const missingTransactionsCounter = new promClient.Counter({ + name: 'missing_block_transactions_total', + help: 'Count of block transactions that could not be immediately fetched', +}); + +export const blockImportErrorsCounter = new promClient.Counter({ + name: 'block_import_errors_total', + help: 'Count of block import errors', +}); + +export const lastHeightImported = new promClient.Gauge({ + name: 'last_height_imported', + help: 'Height of the last block imported', +}); diff --git a/src/system.ts b/src/system.ts index b25c3fcc..e7aa7873 100644 --- a/src/system.ts +++ b/src/system.ts @@ -107,8 +107,6 @@ const eventEmitter = new EventEmitter(); export const blockImporter = new BlockImporter({ log, - metricsRegistry: promClient.register, - errorsCounter: metrics.errorsCounter, chainSource: arweaveClient, chainIndex, eventEmitter, diff --git a/src/workers/block-importer.ts b/src/workers/block-importer.ts index d6756355..afc0e3c8 100644 --- a/src/workers/block-importer.ts +++ b/src/workers/block-importer.ts @@ -16,12 +16,12 @@ * along with this program. If not, see . */ import * as EventEmitter from 'node:events'; -import * as promClient from 'prom-client'; import { default as wait } from 'wait'; import * as winston from 'winston'; import { MAX_FORK_DEPTH } from '../arweave/constants.js'; import * as events from '../events.js'; +import * as metrics from '../metrics.js'; import { ChainIndex, ChainSource, @@ -50,21 +50,8 @@ export class BlockImporter { private startedAt = 0; private transactionsImported = 0; - // Metrics - private errorsCounter: promClient.Counter; - private blockImporterRunningGauge: promClient.Gauge; - private forksCounter: promClient.Counter; - private lastForkDepthGauge: promClient.Gauge; - private blocksImportedCounter: promClient.Counter; - private transactionsImportedCounter: promClient.Counter; - private missingTransactionsCounter: promClient.Counter; - private blockImportErrorsCounter: promClient.Counter; - private lastHeightImported: promClient.Gauge; - constructor({ log, - metricsRegistry, - errorsCounter, chainSource, chainIndex, eventEmitter, @@ -73,8 +60,6 @@ export class BlockImporter { heightPollingIntervalMs = DEFAULT_HEIGHT_POLLING_INTERVAL_MS, }: { log: winston.Logger; - metricsRegistry: promClient.Registry; - errorsCounter: promClient.Counter; chainSource: ChainSource; chainIndex: ChainIndex; eventEmitter: EventEmitter; @@ -94,57 +79,6 @@ export class BlockImporter { this.shouldRun = false; this.startHeight = startHeight; this.stopHeight = stopHeight; - - // Metrics - this.errorsCounter = errorsCounter; - - this.blockImporterRunningGauge = new promClient.Gauge({ - name: 'block_importer_running', - help: 'Depth of the last observed chain fork', - }); - metricsRegistry.registerMetric(this.blockImporterRunningGauge); - - this.forksCounter = new promClient.Counter({ - name: 'forks_total', - help: 'Count of chain forks observed', - }); - metricsRegistry.registerMetric(this.forksCounter); - - this.lastForkDepthGauge = new promClient.Gauge({ - name: 'last_fork_depth', - help: 'Depth of the last observed chain fork', - }); - metricsRegistry.registerMetric(this.lastForkDepthGauge); - - this.blocksImportedCounter = new promClient.Counter({ - name: 'blocks_imported_total', - help: 'Count of blocks imported', - }); - metricsRegistry.registerMetric(this.blocksImportedCounter); - - this.transactionsImportedCounter = new promClient.Counter({ - name: 'block_transactions_imported_total', - help: 'Count of transactions imported', - }); - metricsRegistry.registerMetric(this.transactionsImportedCounter); - - this.missingTransactionsCounter = new promClient.Counter({ - name: 'missing_block_transactions_total', - help: 'Count of block transactions that could not be immediately fetched', - }); - metricsRegistry.registerMetric(this.missingTransactionsCounter); - - this.blockImportErrorsCounter = new promClient.Counter({ - name: 'block_import_errors_total', - help: 'Count of block import errors', - }); - metricsRegistry.registerMetric(this.blockImportErrorsCounter); - - this.lastHeightImported = new promClient.Gauge({ - name: 'last_height_imported', - help: 'Height of the last block imported', - }); - metricsRegistry.registerMetric(this.lastHeightImported); } public async getBlockOrForkedBlock( @@ -191,7 +125,7 @@ export class BlockImporter { } else if (block.previous_block !== previousDbBlockHash) { // Only increment the fork counter once per fork if (forkDepth === 0) { - this.forksCounter.inc(); + metrics.forksCounter.inc(); } // If there is a fork, rewind the index to the fork point this.log.info( @@ -212,7 +146,7 @@ export class BlockImporter { // Record fork count and depth metrics if (forkDepth > 0) { - this.lastForkDepthGauge.set(forkDepth); + metrics.lastForkDepthGauge.set(forkDepth); } return { block, txs, missingTxIds }; @@ -243,11 +177,13 @@ export class BlockImporter { }); // Record import metrics - this.blocksImportedCounter.inc(); - this.transactionsImportedCounter.inc(txs.length); + metrics.blocksImportedCounter.inc(); + metrics.transactionsImportedCounter.inc(txs.length); + metrics.missingTransactionsCounter.inc(missingTxIds.length); + metrics.lastHeightImported.set(block.height); + + // Update internal state this.transactionsImported += txs.length; - this.missingTransactionsCounter.inc(missingTxIds.length); - this.lastHeightImported.set(block.height); this.log.info(`Block imported`, { height: block.height, @@ -279,7 +215,7 @@ export class BlockImporter { public async start() { this.shouldRun = true; this.startedAt = Date.now(); - this.blockImporterRunningGauge.set(1); + metrics.blockImporterRunningGauge.set(1); let nextHeight = -1; // Run until stop is called or an unrecoverable error occurs @@ -304,8 +240,8 @@ export class BlockImporter { await this.importBlock(nextHeight); } catch (error) { this.log.error(`Error importing block at height ${nextHeight}`, error); - this.errorsCounter.inc(); - this.blockImportErrorsCounter.inc(); + metrics.errorsCounter.inc(); + metrics.blockImportErrorsCounter.inc(); await wait(BLOCK_ERROR_RETRY_INTERVAL_MS); } } @@ -313,6 +249,6 @@ export class BlockImporter { public async stop() { this.shouldRun = false; - this.blockImporterRunningGauge.set(0); + metrics.blockImporterRunningGauge.set(0); } }