Skip to content

Commit

Permalink
feat: include batch details in anchor results log (#1229)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 authored Jun 19, 2024
1 parent 3b5040a commit 9356f3a
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ type RequestGroups = {
}

type AnchorSummary = {
// id of the batch
batchId: string
// all requests included in this batch
requests: number
// requests included in this batch after filtering out REPLACED requests
acceptedRequestsCount: number
// number of accepted requests that were anchored in a previous batch and were not included in the current batch.
alreadyAnchoredRequestsCount: number
Expand All @@ -75,6 +79,8 @@ type AnchorSummary = {
}

const logAnchorSummary = async (
batchId: string,
requests: Request[],
requestRepository: RequestRepository,
groupedRequests: RequestGroups,
candidates: Candidate[],
Expand All @@ -84,6 +90,8 @@ const logAnchorSummary = async (

const anchorSummary: AnchorSummary = Object.assign(
{
batchId,
requests: requests.length,
acceptedRequestsCount: groupedRequests.acceptedRequests.length,
alreadyAnchoredRequestsCount: groupedRequests.alreadyAnchoredRequests.length,
anchoredRequestsCount: 0,
Expand Down Expand Up @@ -224,16 +232,12 @@ export class AnchorService {
)
const requests = await this.requestRepository.findByIds(batchMessage.data.rids)

const requestsNotReplaced = requests.filter(
(request) => request.status !== RequestStatus.REPLACED
)

if (abortOptions?.signal?.aborted) {
throw new Error('User aborted before the batch could begin the anchoring process')
}

logger.imp('Anchoring requests')
await this._anchorRequests(requestsNotReplaced)
await this._anchorRequests(requests, batchMessage.data.bid)

// Sleep 5 seconds before exiting the process to give time for the logs to flush.
await Utils.delay(5000)
Expand Down Expand Up @@ -262,23 +266,36 @@ export class AnchorService {
await Utils.delay(5000)
}

private async _anchorRequests(requests: Request[]): Promise<void> {
if (requests.length === 0) {
private async _anchorRequests(requests: Request[], batchId: string = 'unknown'): Promise<void> {
// Only anchor requests that have not been marked REPLACED by the time the batch is processed
const filteredRequests = requests.filter((request) => request.status !== RequestStatus.REPLACED)

if (filteredRequests.length === 0) {
logger.imp('No pending CID requests found. Skipping anchor.')
return
}

const [candidates, groupedRequests] = await this._findCandidates(requests, this.maxStreamLimit)
const [candidates, groupedRequests] = await this._findCandidates(
filteredRequests,
this.maxStreamLimit
)

if (candidates.length === 0) {
logger.imp('No candidates found. Skipping anchor.')
await logAnchorSummary(this.requestRepository, groupedRequests, candidates)
await logAnchorSummary(batchId, requests, this.requestRepository, groupedRequests, candidates)
return
}

try {
const results = await this._anchorCandidates(candidates)
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, results)
await logAnchorSummary(
batchId,
requests,
this.requestRepository,
groupedRequests,
candidates,
results
)
return
} catch (err) {
const acceptedRequests = candidates.map((candidate) => candidate.request).flat()
Expand All @@ -295,13 +312,20 @@ export class AnchorService {

// groupRequests.failedRequests does not include all the newly failed requests so we recount here
const failedRequests = []
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, {
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
})
await logAnchorSummary(
batchId,
requests,
this.requestRepository,
groupedRequests,
candidates,
{
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
}
)

throw err
}
Expand Down

0 comments on commit 9356f3a

Please sign in to comment.