From 3ffbb185835892eaa4a7627e888d9c9c5870c07b Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 18 Jun 2024 17:17:04 -0400 Subject: [PATCH] feat: include batch details in anchor results log --- src/services/anchor-service.ts | 81 ++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/src/services/anchor-service.ts b/src/services/anchor-service.ts index 4e717295..3a073514 100644 --- a/src/services/anchor-service.ts +++ b/src/services/anchor-service.ts @@ -50,6 +50,12 @@ type RequestGroups = { } type AnchorSummary = { + // id of the batch + batchId: string + // number of requests in the batch + batchRequests: number + // number of requests after filtering for REPLACED requests + filteredBatchRequests: number // all requests included in this batch acceptedRequestsCount: number // number of accepted requests that were anchored in a previous batch and were not included in the current batch. @@ -75,6 +81,9 @@ type AnchorSummary = { } const logAnchorSummary = async ( + batchId: string, + requests: Request[], + filteredRequests: Request[], requestRepository: RequestRepository, groupedRequests: RequestGroups, candidates: Candidate[], @@ -84,6 +93,9 @@ const logAnchorSummary = async ( const anchorSummary: AnchorSummary = Object.assign( { + batchId, + batchRequests: requests.length, + filteredBatchRequests: filteredRequests.length, acceptedRequestsCount: groupedRequests.acceptedRequests.length, alreadyAnchoredRequestsCount: groupedRequests.alreadyAnchoredRequests.length, anchoredRequestsCount: 0, @@ -224,16 +236,12 @@ export class AnchorService { ) const requests = await this.requestRepository.findByIds(batchMessage.data.rids) - const requestsNotReplaced = requests.filter( - (request) => request.status !== RequestStatus.REPLACED - ) - if (abortOptions?.signal?.aborted) { throw new Error('User aborted before the batch could begin the anchoring process') } logger.imp('Anchoring requests') - await this._anchorRequests(requestsNotReplaced) + await this._anchorRequests(requests, batchMessage.data.bid) // Sleep 5 seconds before exiting the process to give time for the logs to flush. await Utils.delay(5000) @@ -262,46 +270,65 @@ export class AnchorService { await Utils.delay(5000) } - private async _anchorRequests(requests: Request[]): Promise { - if (requests.length === 0) { + private async _anchorRequests(requests: Request[], batchId: string = 'unknown'): Promise { + // Only anchor requests that have not been marked REPLACED by the time the batch is processed + const filteredRequests = requests.filter((request) => request.status !== RequestStatus.REPLACED) + + if (filteredRequests.length === 0) { logger.imp('No pending CID requests found. Skipping anchor.') return } - const [candidates, groupedRequests] = await this._findCandidates(requests, this.maxStreamLimit) + const [candidates, groupedRequests] = await this._findCandidates( + filteredRequests, + this.maxStreamLimit + ) if (candidates.length === 0) { logger.imp('No candidates found. Skipping anchor.') - await logAnchorSummary(this.requestRepository, groupedRequests, candidates) + await logAnchorSummary( + batchId, + requests, + filteredRequests, + this.requestRepository, + groupedRequests, + candidates + ) return } try { const results = await this._anchorCandidates(candidates) - await logAnchorSummary(this.requestRepository, groupedRequests, candidates, results) + await logAnchorSummary( + batchId, + requests, + filteredRequests, + this.requestRepository, + groupedRequests, + candidates, + results + ) return } catch (err) { const acceptedRequests = candidates.map((candidate) => candidate.request).flat() - // If we are using queued batches, the queue will retry the entire batch. Status updates are not needed for retry. - if (!this.useQueueBatches) { - logger.warn( - `Updating PROCESSING requests to PENDING so they are retried in the next batch because an error occurred while creating the anchors: ${err}` - ) - await this.requestRepository.updateRequests({ status: RS.PENDING }, acceptedRequests) - - Metrics.count(METRIC_NAMES.REVERT_TO_PENDING, acceptedRequests.length) - } - // groupRequests.failedRequests does not include all the newly failed requests so we recount here const failedRequests = [] - await logAnchorSummary(this.requestRepository, groupedRequests, candidates, { - failedRequestsCount: failedRequests.length, - // NOTE: We will retry all of the above requests that were updated back to PENDING. - // We also may retry all failed requests other than requests rejected from conflict resolution. - // A failed request will not be retried if it has expired when the next anchor runs. - canRetryCount: failedRequests.length + acceptedRequests.length, - }) + await logAnchorSummary( + batchId, + requests, + filteredRequests, + this.requestRepository, + groupedRequests, + candidates, + { + failedRequestsCount: failedRequests.length, + // NOTE: We will retry all of the above requests that were updated back to PENDING. + // We also may retry all failed requests other than requests rejected from conflict resolution. + // A failed request will not be retried if it has expired when the next anchor runs. + canRetryCount: failedRequests.length + acceptedRequests.length, + } + ) throw err }