Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include batch details in anchor results log #1229

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ type RequestGroups = {
}

type AnchorSummary = {
// id of the batch
batchId: string
// all requests included in this batch
requests: number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I know this is already merged but there are a lot of fields in this summary that are some sort of count of requests. Just calling this requests I'm worried will be confusing to someone trying to understand what this field represents and how it's different than anchorCount, acceptedRequestsCount, etc. Can we rename this to something a bit more descriptive (and add the Count suffix for consistency with the other fields in this object)?

// requests included in this batch after filtering out REPLACED requests
acceptedRequestsCount: number
// number of accepted requests that were anchored in a previous batch and were not included in the current batch.
alreadyAnchoredRequestsCount: number
Expand All @@ -75,6 +79,8 @@ type AnchorSummary = {
}

const logAnchorSummary = async (
batchId: string,
requests: Request[],
requestRepository: RequestRepository,
groupedRequests: RequestGroups,
candidates: Candidate[],
Expand All @@ -84,6 +90,8 @@ const logAnchorSummary = async (

const anchorSummary: AnchorSummary = Object.assign(
{
batchId,
requests: requests.length,
acceptedRequestsCount: groupedRequests.acceptedRequests.length,
alreadyAnchoredRequestsCount: groupedRequests.alreadyAnchoredRequests.length,
anchoredRequestsCount: 0,
Expand Down Expand Up @@ -224,16 +232,12 @@ export class AnchorService {
)
const requests = await this.requestRepository.findByIds(batchMessage.data.rids)

const requestsNotReplaced = requests.filter(
(request) => request.status !== RequestStatus.REPLACED
)

if (abortOptions?.signal?.aborted) {
throw new Error('User aborted before the batch could begin the anchoring process')
}

logger.imp('Anchoring requests')
await this._anchorRequests(requestsNotReplaced)
await this._anchorRequests(requests, batchMessage.data.bid)

// Sleep 5 seconds before exiting the process to give time for the logs to flush.
await Utils.delay(5000)
Expand Down Expand Up @@ -262,23 +266,36 @@ export class AnchorService {
await Utils.delay(5000)
}

private async _anchorRequests(requests: Request[]): Promise<void> {
if (requests.length === 0) {
private async _anchorRequests(requests: Request[], batchId: string = 'unknown'): Promise<void> {
// Only anchor requests that have not been marked REPLACED by the time the batch is processed
const filteredRequests = requests.filter((request) => request.status !== RequestStatus.REPLACED)

if (filteredRequests.length === 0) {
logger.imp('No pending CID requests found. Skipping anchor.')
return
}

const [candidates, groupedRequests] = await this._findCandidates(requests, this.maxStreamLimit)
const [candidates, groupedRequests] = await this._findCandidates(
filteredRequests,
this.maxStreamLimit
)

if (candidates.length === 0) {
logger.imp('No candidates found. Skipping anchor.')
await logAnchorSummary(this.requestRepository, groupedRequests, candidates)
await logAnchorSummary(batchId, requests, this.requestRepository, groupedRequests, candidates)
return
}

try {
const results = await this._anchorCandidates(candidates)
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, results)
await logAnchorSummary(
batchId,
requests,
this.requestRepository,
groupedRequests,
candidates,
results
)
return
} catch (err) {
const acceptedRequests = candidates.map((candidate) => candidate.request).flat()
Expand All @@ -295,13 +312,20 @@ export class AnchorService {

// groupRequests.failedRequests does not include all the newly failed requests so we recount here
const failedRequests = []
await logAnchorSummary(this.requestRepository, groupedRequests, candidates, {
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
})
await logAnchorSummary(
batchId,
requests,
this.requestRepository,
groupedRequests,
candidates,
{
failedRequestsCount: failedRequests.length,
// NOTE: We will retry all of the above requests that were updated back to PENDING.
// We also may retry all failed requests other than requests rejected from conflict resolution.
// A failed request will not be retried if it has expired when the next anchor runs.
canRetryCount: failedRequests.length + acceptedRequests.length,
}
)

throw err
}
Expand Down