Skip to content

Commit

Permalink
feat: include batch details in anchor results log
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Jun 18, 2024
1 parent 3b5040a commit 3ffbb18
Showing 1 changed file with 54 additions and 27 deletions.
81 changes: 54 additions & 27 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type RequestGroups = {
}

type AnchorSummary = {
// id of the batch
batchId: string
// number of requests in the batch
batchRequests: number
// number of requests after filtering for REPLACED requests
filteredBatchRequests: number
// all requests included in this batch
acceptedRequestsCount: number
// number of accepted requests that were anchored in a previous batch and were not included in the current batch.
Expand All @@ -75,6 +81,9 @@ type AnchorSummary = {
}

const logAnchorSummary = async (
batchId: string,
requests: Request[],
filteredRequests: Request[],
requestRepository: RequestRepository,
groupedRequests: RequestGroups,
candidates: Candidate[],
Expand All @@ -84,6 +93,9 @@ const logAnchorSummary = async (

const anchorSummary: AnchorSummary = Object.assign(
{
batchId,
batchRequests: requests.length,
filteredBatchRequests: filteredRequests.length,
acceptedRequestsCount: groupedRequests.acceptedRequests.length,
alreadyAnchoredRequestsCount: groupedRequests.alreadyAnchoredRequests.length,
anchoredRequestsCount: 0,
Expand Down Expand Up @@ -224,16 +236,12 @@ export class AnchorService {
)
const requests = await this.requestRepository.findByIds(batchMessage.data.rids)

const requestsNotReplaced = requests.filter(
(request) => request.status !== RequestStatus.REPLACED
)

if (abortOptions?.signal?.aborted) {
throw new Error('User aborted before the batch could begin the anchoring process')
}

logger.imp('Anchoring requests')
await this._anchorRequests(requestsNotReplaced)
await this._anchorRequests(requests, batchMessage.data.bid)

// Sleep 5 seconds before exiting the process to give time for the logs to flush.
await Utils.delay(5000)
Expand Down Expand Up @@ -262,46 +270,65 @@ export class AnchorService {
await Utils.delay(5000)
}

private async _anchorRequests(requests: Request[]): Promise<void> {
if (requests.length === 0) {
private async _anchorRequests(requests: Request[], batchId: string = 'unknown'): Promise<void> {
// Only anchor requests that have not been marked REPLACED by the time the batch is processed
const filteredRequests = requests.filter((request) => request.status !== RequestStatus.REPLACED)

if (filteredRequests.length === 0) {
logger.imp('No pending CID requests found. Skipping anchor.')
return
}

const [candidates, groupedRequests] = await this._findCandidates(requests, this.maxStreamLimit)
const [candidates, groupedRequests] = await this._findCandidates(
filteredRequests,
this.maxStreamLimit
)

if (candidates.length === 0) {
logger.imp('No candidates found. Skipping anchor.')
await logAnchorSummary(this.requestRepository, groupedRequests, candidates)
await logAnchorSummary(
batchId,
requests,
filteredRequests,
this.requestRepository,
groupedRequests,
candidates
)
return
}

try {
const results = await this._anchorCandidates(candidates)
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, results)
await logAnchorSummary(
batchId,
requests,
filteredRequests,
this.requestRepository,
groupedRequests,
candidates,
results
)
return
} catch (err) {
const acceptedRequests = candidates.map((candidate) => candidate.request).flat()

// If we are using queued batches, the queue will retry the entire batch. Status updates are not needed for retry.
if (!this.useQueueBatches) {
logger.warn(
`Updating PROCESSING requests to PENDING so they are retried in the next batch because an error occurred while creating the anchors: ${err}`
)
await this.requestRepository.updateRequests({ status: RS.PENDING }, acceptedRequests)

Metrics.count(METRIC_NAMES.REVERT_TO_PENDING, acceptedRequests.length)
}

// groupRequests.failedRequests does not include all the newly failed requests so we recount here
const failedRequests = []
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, {
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
})
await logAnchorSummary(
batchId,
requests,
filteredRequests,
this.requestRepository,
groupedRequests,
candidates,
{
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
}
)

throw err
}
Expand Down

0 comments on commit 3ffbb18

Please sign in to comment.