Skip to content

Commit

Permalink
feat: include batch details in anchor results log
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Jun 18, 2024
1 parent 3b5040a commit c85fed6
Showing 1 changed file with 56 additions and 61 deletions.
117 changes: 56 additions & 61 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from '@ceramicnetwork/anchor-utils'
import { Candidate } from './candidate.js'
import { MerkleCarFactory, type IMerkleTree, type MerkleCAR } from '../merkle/merkle-car-factory.js'
import { IQueueConsumerService } from './queue/queue-service.type.js'
import { IQueueConsumerService, IQueueMessage } from './queue/queue-service.type.js'
import { AnchorBatchQMessage } from '../models/queue-message.js'
import { create as createMultihash } from 'multiformats/hashes/digest'
import { CAR } from 'cartonne'
Expand All @@ -50,6 +50,12 @@ type RequestGroups = {
}

type AnchorSummary = {
// id of the batch
batchId: string
// number of requests in the batch
batchRequests: number
// number of requests after filtering for REPLACED requests
filteredBatchRequests: number
// all requests included in this batch
acceptedRequestsCount: number
// number of accepted requests that were anchored in a previous batch and were not included in the current batch.
Expand All @@ -75,6 +81,9 @@ type AnchorSummary = {
}

const logAnchorSummary = async (
batchId: string,
batchRequests: Request[],
filteredBatchRequests: Request[],
requestRepository: RequestRepository,
groupedRequests: RequestGroups,
candidates: Candidate[],
Expand All @@ -84,6 +93,9 @@ const logAnchorSummary = async (

const anchorSummary: AnchorSummary = Object.assign(
{
batchId,
batchRequests: batchRequests.length,
filteredBatchRequests: filteredBatchRequests.length,
acceptedRequestsCount: groupedRequests.acceptedRequests.length,
alreadyAnchoredRequestsCount: groupedRequests.alreadyAnchoredRequests.length,
anchoredRequestsCount: 0,
Expand Down Expand Up @@ -174,31 +186,11 @@ export class AnchorService {
this.merkleCarFactory = new MerkleCarFactory(logger, this.merkleDepthLimit)
}

/**
* Creates anchors for pending client requests
*/
// TODO: Remove for CAS V2 as we won't need to move PENDING requests to ready. Switch to using anchorReadyRequests.
async anchorRequests(abortOptions?: AbortOptions): Promise<boolean> {
if (this.useQueueBatches) {
return this.anchorNextQueuedBatch(abortOptions)
} else {
const readyRequestsCount = await this.requestRepository.countByStatus(RS.READY)

if (readyRequestsCount === 0) {
// Pull in twice as many streams as we want to anchor, since some of those streams may fail to load.
await this.requestRepository.findAndMarkReady(this.maxStreamLimit * 2, this.minStreamLimit)
}

await this.anchorReadyRequests()
return true
}
}

/**
* Retrieves a batch of requests and creates anchors for them.
* Return true if we anchoed a batch. Returns false if there was no batch to anchor therefore no anchor was completed
*/
async anchorNextQueuedBatch(abortOptions?: AbortOptions): Promise<boolean> {
async anchorRequests(abortOptions?: AbortOptions): Promise<boolean> {
if (abortOptions?.signal?.aborted) {
throw new Error('User aborted before the next batch has been retrieved')
}
Expand All @@ -222,18 +214,12 @@ export class AnchorService {
logger.imp(
`Anchoring ${batchMessage.data.rids.length} requests from batch ${batchMessage.data.bid}`
)
const requests = await this.requestRepository.findByIds(batchMessage.data.rids)

const requestsNotReplaced = requests.filter(
(request) => request.status !== RequestStatus.REPLACED
)

if (abortOptions?.signal?.aborted) {
throw new Error('User aborted before the batch could begin the anchoring process')
}

logger.imp('Anchoring requests')
await this._anchorRequests(requestsNotReplaced)
await this._anchorRequests(batchMessage)

// Sleep 5 seconds before exiting the process to give time for the logs to flush.
await Utils.delay(5000)
Expand All @@ -250,58 +236,67 @@ export class AnchorService {
}
}

/**
* Creates anchors for client requests that have been marked as READY
*/
async anchorReadyRequests(): Promise<void> {
logger.imp('Anchoring ready requests...')
const requests = await this.requestRepository.batchProcessing(this.maxStreamLimit)
await this._anchorRequests(requests)
private async _anchorRequests(batchMessage: IQueueMessage<AnchorBatchQMessage>): Promise<void> {
const batchRequests = await this.requestRepository.findByIds(batchMessage.data.rids)

// Sleep 5 seconds before exiting the process to give time for the logs to flush.
await Utils.delay(5000)
}
// Only anchor requests that have not been marked REPLACED by the time the batch is processed
const filteredBatchRequests = batchRequests.filter((request) => request.status !== RequestStatus.REPLACED)

private async _anchorRequests(requests: Request[]): Promise<void> {
if (requests.length === 0) {
if (filteredBatchRequests.length === 0) {
logger.imp('No pending CID requests found. Skipping anchor.')
return
}

const [candidates, groupedRequests] = await this._findCandidates(requests, this.maxStreamLimit)
const [candidates, groupedRequests] = await this._findCandidates(
filteredBatchRequests,
this.maxStreamLimit
)

if (candidates.length === 0) {
logger.imp('No candidates found. Skipping anchor.')
await logAnchorSummary(this.requestRepository, groupedRequests, candidates)
await logAnchorSummary(
batchMessage.data.bid,
batchRequests,
filteredBatchRequests,
this.requestRepository,
groupedRequests,
candidates
)
return
}

try {
const results = await this._anchorCandidates(candidates)
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, results)
await logAnchorSummary(
batchMessage.data.bid,
batchRequests,
filteredBatchRequests,
this.requestRepository,
groupedRequests,
candidates,
results
)
return
} catch (err) {
const acceptedRequests = candidates.map((candidate) => candidate.request).flat()

// If we are using queued batches, the queue will retry the entire batch. Status updates are not needed for retry.
if (!this.useQueueBatches) {
logger.warn(
`Updating PROCESSING requests to PENDING so they are retried in the next batch because an error occurred while creating the anchors: ${err}`
)
await this.requestRepository.updateRequests({ status: RS.PENDING }, acceptedRequests)

Metrics.count(METRIC_NAMES.REVERT_TO_PENDING, acceptedRequests.length)
}

// groupRequests.failedRequests does not include all the newly failed requests so we recount here
const failedRequests = []
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, {
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
})
await logAnchorSummary(
batchMessage.data.bid,
batchRequests,
filteredBatchRequests,
this.requestRepository,
groupedRequests,
candidates,
{
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
}
)

throw err
}
Expand Down

0 comments on commit c85fed6

Please sign in to comment.