diff --git a/src/services/anchor-service.ts b/src/services/anchor-service.ts index 4e717295..fe61efb1 100644 --- a/src/services/anchor-service.ts +++ b/src/services/anchor-service.ts @@ -32,7 +32,7 @@ import { } from '@ceramicnetwork/anchor-utils' import { Candidate } from './candidate.js' import { MerkleCarFactory, type IMerkleTree, type MerkleCAR } from '../merkle/merkle-car-factory.js' -import { IQueueConsumerService } from './queue/queue-service.type.js' +import { IQueueConsumerService, IQueueMessage } from './queue/queue-service.type.js' import { AnchorBatchQMessage } from '../models/queue-message.js' import { create as createMultihash } from 'multiformats/hashes/digest' import { CAR } from 'cartonne' @@ -50,6 +50,12 @@ type RequestGroups = { } type AnchorSummary = { + // id of the batch + batchId: string + // number of requests in the batch + batchRequests: number + // number of requests after filtering for REPLACED requests + filteredBatchRequests: number // all requests included in this batch acceptedRequestsCount: number // number of accepted requests that were anchored in a previous batch and were not included in the current batch. @@ -75,6 +81,9 @@ type AnchorSummary = { } const logAnchorSummary = async ( + batchId: string, + batchRequests: Request[], + filteredBatchRequests: Request[], requestRepository: RequestRepository, groupedRequests: RequestGroups, candidates: Candidate[], @@ -84,6 +93,9 @@ const logAnchorSummary = async ( const anchorSummary: AnchorSummary = Object.assign( { + batchId, + batchRequests: batchRequests.length, + filteredBatchRequests: filteredBatchRequests.length, acceptedRequestsCount: groupedRequests.acceptedRequests.length, alreadyAnchoredRequestsCount: groupedRequests.alreadyAnchoredRequests.length, anchoredRequestsCount: 0, @@ -174,31 +186,11 @@ export class AnchorService { this.merkleCarFactory = new MerkleCarFactory(logger, this.merkleDepthLimit) } - /** - * Creates anchors for pending client requests - */ - // TODO: Remove for CAS V2 as we won't need to move PENDING requests to ready. Switch to using anchorReadyRequests. - async anchorRequests(abortOptions?: AbortOptions): Promise { - if (this.useQueueBatches) { - return this.anchorNextQueuedBatch(abortOptions) - } else { - const readyRequestsCount = await this.requestRepository.countByStatus(RS.READY) - - if (readyRequestsCount === 0) { - // Pull in twice as many streams as we want to anchor, since some of those streams may fail to load. - await this.requestRepository.findAndMarkReady(this.maxStreamLimit * 2, this.minStreamLimit) - } - - await this.anchorReadyRequests() - return true - } - } - /** * Retrieves a batch of requests and creates anchors for them. * Return true if we anchoed a batch. Returns false if there was no batch to anchor therefore no anchor was completed */ - async anchorNextQueuedBatch(abortOptions?: AbortOptions): Promise { + async anchorRequests(abortOptions?: AbortOptions): Promise { if (abortOptions?.signal?.aborted) { throw new Error('User aborted before the next batch has been retrieved') } @@ -222,18 +214,12 @@ export class AnchorService { logger.imp( `Anchoring ${batchMessage.data.rids.length} requests from batch ${batchMessage.data.bid}` ) - const requests = await this.requestRepository.findByIds(batchMessage.data.rids) - - const requestsNotReplaced = requests.filter( - (request) => request.status !== RequestStatus.REPLACED - ) - if (abortOptions?.signal?.aborted) { throw new Error('User aborted before the batch could begin the anchoring process') } logger.imp('Anchoring requests') - await this._anchorRequests(requestsNotReplaced) + await this._anchorRequests(batchMessage) // Sleep 5 seconds before exiting the process to give time for the logs to flush. await Utils.delay(5000) @@ -250,58 +236,67 @@ export class AnchorService { } } - /** - * Creates anchors for client requests that have been marked as READY - */ - async anchorReadyRequests(): Promise { - logger.imp('Anchoring ready requests...') - const requests = await this.requestRepository.batchProcessing(this.maxStreamLimit) - await this._anchorRequests(requests) + private async _anchorRequests(batchMessage: IQueueMessage): Promise { + const batchRequests = await this.requestRepository.findByIds(batchMessage.data.rids) - // Sleep 5 seconds before exiting the process to give time for the logs to flush. - await Utils.delay(5000) - } + // Only anchor requests that have not been marked REPLACED by the time the batch is processed + const filteredBatchRequests = batchRequests.filter((request) => request.status !== RequestStatus.REPLACED) - private async _anchorRequests(requests: Request[]): Promise { - if (requests.length === 0) { + if (filteredBatchRequests.length === 0) { logger.imp('No pending CID requests found. Skipping anchor.') return } - const [candidates, groupedRequests] = await this._findCandidates(requests, this.maxStreamLimit) + const [candidates, groupedRequests] = await this._findCandidates( + filteredBatchRequests, + this.maxStreamLimit + ) if (candidates.length === 0) { logger.imp('No candidates found. Skipping anchor.') - await logAnchorSummary(this.requestRepository, groupedRequests, candidates) + await logAnchorSummary( + batchMessage.data.bid, + batchRequests, + filteredBatchRequests, + this.requestRepository, + groupedRequests, + candidates + ) return } try { const results = await this._anchorCandidates(candidates) - await logAnchorSummary(this.requestRepository, groupedRequests, candidates, results) + await logAnchorSummary( + batchMessage.data.bid, + batchRequests, + filteredBatchRequests, + this.requestRepository, + groupedRequests, + candidates, + results + ) return } catch (err) { const acceptedRequests = candidates.map((candidate) => candidate.request).flat() - // If we are using queued batches, the queue will retry the entire batch. Status updates are not needed for retry. - if (!this.useQueueBatches) { - logger.warn( - `Updating PROCESSING requests to PENDING so they are retried in the next batch because an error occurred while creating the anchors: ${err}` - ) - await this.requestRepository.updateRequests({ status: RS.PENDING }, acceptedRequests) - - Metrics.count(METRIC_NAMES.REVERT_TO_PENDING, acceptedRequests.length) - } - // groupRequests.failedRequests does not include all the newly failed requests so we recount here const failedRequests = [] - await logAnchorSummary(this.requestRepository, groupedRequests, candidates, { - failedRequestsCount: failedRequests.length, - // NOTE: We will retry all of the above requests that were updated back to PENDING. - // We also may retry all failed requests other than requests rejected from conflict resolution. - // A failed request will not be retried if it has expired when the next anchor runs. - canRetryCount: failedRequests.length + acceptedRequests.length, - }) + await logAnchorSummary( + batchMessage.data.bid, + batchRequests, + filteredBatchRequests, + this.requestRepository, + groupedRequests, + candidates, + { + failedRequestsCount: failedRequests.length, + // NOTE: We will retry all of the above requests that were updated back to PENDING. + // We also may retry all failed requests other than requests rejected from conflict resolution. + // A failed request will not be retried if it has expired when the next anchor runs. + canRetryCount: failedRequests.length + acceptedRequests.length, + } + ) throw err }