Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add queue depth metrics #198

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/arweave/composite-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -781,4 +781,8 @@ export class ArweaveCompositeClient
results,
};
}

queueDepth(): number {
return this.trustedNodeRequestQueue.length();
}
}
23 changes: 23 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import * as promClient from 'prom-client';
import { Gauge } from 'prom-client';

/* eslint-disable */
// @ts-ignore
import PrometheusMetrics from 'opossum-prometheus';
Expand Down Expand Up @@ -197,3 +199,24 @@ export const getDataStreamSuccessesTotal = new promClient.Counter({
help: 'Count of data stream successes',
labelNames: ['class'],
});

// Queue length metrics

const queues: { [key: string]: { length: () => number } } = {};
export function registerQueueLengthGauge(
name: string,
queue: { length: () => number },
) {
queues[name] = queue;
}
Comment on lines +206 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function registerQueueLengthGauge implemented correctly, consider handling duplicates.

The implementation of registerQueueLengthGauge is correct and effectively registers queues for monitoring. However, consider adding error handling or a check to prevent potential issues if a queue with the same name is registered multiple times.

Consider implementing a mechanism to handle duplicate queue names to prevent unintended overwrites. This could be a simple check before adding the queue to the queues object.


export const queueLengthGauge = new Gauge({
name: 'queue_length',
help: 'Current length of queues',
labelNames: ['queue_name'],
collect() {
Object.entries(queues).forEach(([queueName, queue]) => {
this.set({ queue_name: queueName }, queue.length());
});
},
});
30 changes: 30 additions & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ export const arweaveClient = new ArweaveCompositeClient({
failureRate: config.SIMULATED_REQUEST_FAILURE_RATE,
}),
});
metrics.registerQueueLengthGauge('arweaveClientRequests', {
length: () => arweaveClient.queueDepth(),
});

export const db = new StandaloneSqliteDatabase({
log,
Expand Down Expand Up @@ -255,6 +258,9 @@ export const txFetcher = new TransactionFetcher({
chainSource: arweaveClient,
eventEmitter,
});
metrics.registerQueueLengthGauge('txFetcher', {
length: () => txFetcher.queueDepth(),
});

// Async fetch block TXs that failed sync fetch
eventEmitter.on(events.BLOCK_TX_FETCH_FAILED, ({ id: txId }) => {
Expand All @@ -266,6 +272,9 @@ const txImporter = new TransactionImporter({
chainIndex,
eventEmitter,
});
metrics.registerQueueLengthGauge('txImporter', {
length: () => txImporter.queueDepth(),
});

// Queue fetched TXs to
eventEmitter.addListener(events.TX_FETCHED, (tx: PartialJsonTransaction) => {
Expand All @@ -283,6 +292,9 @@ const txOffsetImporter = new TransactionOffsetImporter({
chainSource: arweaveClient,
chainOffsetIndex,
});
metrics.registerQueueLengthGauge('txOffsetImporter', {
length: () => txOffsetImporter.queueDepth(),
});

export const txOffsetRepairWorker = new TransactionOffsetRepairWorker({
log,
Expand Down Expand Up @@ -364,6 +376,9 @@ const dataContentAttributeImporter = new DataContentAttributeImporter({
log,
contiguousDataIndex: contiguousDataIndex,
});
metrics.registerQueueLengthGauge('dataContentAttributeImporter', {
length: () => dataContentAttributeImporter.queueDepth(),
});

export const contiguousDataSource = new ReadThroughDataCache({
log,
Expand All @@ -381,13 +396,19 @@ export const dataItemIndexer = new DataItemIndexer({
eventEmitter,
indexWriter: dataItemIndexWriter,
});
metrics.registerQueueLengthGauge('dataItemIndexer', {
length: () => dataItemIndexer.queueDepth(),
});

const ans104DataIndexer = new Ans104DataIndexer({
log,
eventEmitter,
indexWriter: nestedDataIndexWriter,
contiguousDataIndex,
});
metrics.registerQueueLengthGauge('ans104DataIndexer', {
length: () => ans104DataIndexer.queueDepth(),
});

const shouldUnbundleDataItems = () =>
ans104DataIndexer.queueDepth() < config.MAX_DATA_ITEM_QUEUE_SIZE &&
Expand All @@ -402,13 +423,19 @@ const ans104Unbundler = new Ans104Unbundler({
workerCount: config.ANS104_UNBUNDLE_WORKERS,
shouldUnbundle: shouldUnbundleDataItems,
});
metrics.registerQueueLengthGauge('ans104Unbundler', {
length: () => ans104Unbundler.queueDepth(),
});

const bundleDataImporter = new BundleDataImporter({
log,
contiguousDataSource,
ans104Unbundler,
workerCount: config.ANS104_DOWNLOAD_WORKERS,
});
metrics.registerQueueLengthGauge('bundleDataImporter', {
length: () => bundleDataImporter.queueDepth(),
});

async function queueBundle(
item: NormalizedDataItem | PartialJsonTransaction,
Expand Down Expand Up @@ -540,6 +567,9 @@ const webhookEmitter = new WebhookEmitter({
blockFilter: config.WEBHOOK_BLOCK_FILTER,
log,
});
metrics.registerQueueLengthGauge('webhookEmitter', {
length: () => webhookEmitter.queueDepth(),
});

export const mempoolWatcher = config.ENABLE_MEMPOOL_WATCHER
? new MempoolWatcher({
Expand Down
4 changes: 4 additions & 0 deletions src/workers/bundle-data-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,8 @@ export class BundleDataImporter {
await this.queue.drained();
log.debug('Stopped successfully.');
}

queueDepth(): number {
return this.queue.length();
}
Comment on lines +138 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method queueDepth correctly implemented.

The implementation of queueDepth method is correct and adheres to the expected functionality of returning the current queue length. It's a straightforward and effective way to monitor the queue size.

Consider adding unit tests to verify the behavior of the queueDepth method. Would you like assistance in creating these tests?

}
4 changes: 4 additions & 0 deletions src/workers/data-content-attribute-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ export class DataContentAttributeImporter {
await this.queue.drained();
log.debug('Stopped successfully.');
}

queueDepth(): number {
return this.queue.length();
}
}
4 changes: 4 additions & 0 deletions src/workers/transaction-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,8 @@ export class TransactionFetcher {
await this.queue.drained();
log.debug('Stopped successfully.');
}

queueDepth(): number {
return this.queue.length();
}
Comment on lines +125 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method queueDepth correctly implemented.

The implementation of queueDepth method is correct and adheres to the expected functionality of returning the current queue length. It's a straightforward and effective way to monitor the queue size.

Consider adding unit tests to verify the behavior of the queueDepth method. Would you like assistance in creating these tests?

}
4 changes: 4 additions & 0 deletions src/workers/transaction-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ export class TransactionImporter {
await this.queue.drained();
log.debug('Stopped successfully.');
}

queueDepth(): number {
return this.queue.length();
}
}
4 changes: 4 additions & 0 deletions src/workers/transaction-offset-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,8 @@ export class TransactionOffsetImporter {
await this.queue.drained();
log.debug('Stopped successfully.');
}

queueDepth(): number {
return this.queue.length();
}
}
4 changes: 4 additions & 0 deletions src/workers/webhook-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,8 @@ export class WebhookEmitter {
this.log.error('Unexpected error while emitting webhook:', error);
}
}

queueDepth(): number {
return this.emissionQueue.length();
}
}