diff --git a/config/default.json b/config/default.json index 6d562fc5..95b5787b 100644 --- a/config/default.json +++ b/config/default.json @@ -15,6 +15,7 @@ "schedulerIntervalMS": 300000, "schedulerStopAfterNoOp": false, "pubsubResponderWindowMs": 8035200000, + "alertOnLongAnchorMs": 1200000, "carStorage": { "mode": "inmemory", "s3BucketName": "myS3Bucket", diff --git a/config/env/dev.json b/config/env/dev.json index f0e9ed76..2557853f 100644 --- a/config/env/dev.json +++ b/config/env/dev.json @@ -15,6 +15,7 @@ "schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS", "schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP", "pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS", + "alertOnLongAnchorMs": "@@ALERT_ON_LONG_ANCHOR_MS", "carStorage": { "mode": "@@MERKLE_CAR_STORAGE_MODE", "s3BucketName": "@@S3_BUCKET_NAME", diff --git a/config/env/prod.json b/config/env/prod.json index f0e9ed76..2557853f 100644 --- a/config/env/prod.json +++ b/config/env/prod.json @@ -15,6 +15,7 @@ "schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS", "schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP", "pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS", + "alertOnLongAnchorMs": "@@ALERT_ON_LONG_ANCHOR_MS", "carStorage": { "mode": "@@MERKLE_CAR_STORAGE_MODE", "s3BucketName": "@@S3_BUCKET_NAME", diff --git a/src/services/anchor-service.ts b/src/services/anchor-service.ts index 4e717295..fa60c4de 100644 --- a/src/services/anchor-service.ts +++ b/src/services/anchor-service.ts @@ -136,6 +136,7 @@ export class AnchorService { private readonly maxStreamLimit: number private readonly minStreamLimit: number private readonly merkleCarFactory: MerkleCarFactory + private readonly alertOnLongAnchorMs: number static inject = [ 'blockchainService', @@ -167,6 +168,7 @@ export class AnchorService { this.merkleDepthLimit = config.merkleDepthLimit this.useSmartContractAnchors = config.useSmartContractAnchors this.useQueueBatches = Boolean(config.queue.sqsQueueUrl) + this.alertOnLongAnchorMs = Number(config.alertOnLongAnchorMs || 1200000) // default 20 minutes const minStreamCount = Number(config.minStreamCount) this.maxStreamLimit = this.merkleDepthLimit > 0 ? Math.pow(2, this.merkleDepthLimit) : 0 @@ -179,8 +181,18 @@ export class AnchorService { */ // TODO: Remove for CAS V2 as we won't need to move PENDING requests to ready. Switch to using anchorReadyRequests. async anchorRequests(abortOptions?: AbortOptions): Promise { + const timeout = setTimeout(() => { + Metrics.count(METRIC_NAMES.ANCHOR_TAKING_TOO_LONG, 1) + }, this.alertOnLongAnchorMs) + + abortOptions?.signal?.addEventListener('abort', () => { + clearTimeout(timeout) + }) + if (this.useQueueBatches) { - return this.anchorNextQueuedBatch(abortOptions) + const batchAnchored = await this.anchorNextQueuedBatch(abortOptions) + clearTimeout(timeout) + return batchAnchored } else { const readyRequestsCount = await this.requestRepository.countByStatus(RS.READY) @@ -190,6 +202,7 @@ export class AnchorService { } await this.anchorReadyRequests() + clearTimeout(timeout) return true } } diff --git a/src/settings.ts b/src/settings.ts index acb22c6d..f080aa33 100644 --- a/src/settings.ts +++ b/src/settings.ts @@ -29,6 +29,7 @@ export enum METRIC_NAMES { MERKLE_CAR_STORAGE_FAILURE_IPFS = 'merkle_car_storage_failure_ipfs', MERKLE_CAR_STORAGE_FAILURE_S3 = 'merkle_car_storage_failure_s3', WITNESS_CAR_STORAGE_FAILURE = 'witness_car_storage_failure', + ANCHOR_TAKING_TOO_LONG = 'anchor_taking_too_long', // Transaction repository MANY_ATTEMPTS_TO_ACQUIRE_MUTEX = 'many_attempts_to_acquire_mutex',