Skip to content

Commit

Permalink
Fix retries in the distribution flows: addScores, complete distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
kanshi committed Sep 23, 2024
1 parent 0fac817 commit 573e780
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 56 deletions.
3 changes: 3 additions & 0 deletions src/distribution/dto/distribution-completion-data.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Score } from "../interfaces/distribution"

export class DistributionCompletionData {
stamp: number
total: number
retries: number
processed: Score[]
}
118 changes: 66 additions & 52 deletions src/tasks/processors/distribution-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ export class DistributionQueue extends WorkerHost {
})

this.tasks.distributionFlow.add(
TasksService.DISTRIBUTION_FLOW(
data.verified_at,
scores.length,
DistributionService.maxDistributionRetries,
scoreJobs,
),
TasksService.DISTRIBUTION_FLOW({
stamp: data.verified_at,
total: scores.length,
retries: DistributionService.maxDistributionRetries,
scoreJobs: scoreJobs,
processed: []
}),
)
return true
} else {
Expand Down Expand Up @@ -155,11 +156,15 @@ export class DistributionQueue extends WorkerHost {
},
TasksService.jobOpts
)
}

return {
...distributionCompletedResults,
...persistResult
this.logger.log(
`Failed persisting distribution summary [${job.data.stamp}] - ${persistResult}. Retries left ${job.data.retries}...`
)
return undefined
} else {
return {
...distributionCompletedResults,
...persistResult
}
}
} catch (err) {
this.logger.error(
Expand Down Expand Up @@ -206,11 +211,15 @@ export class DistributionQueue extends WorkerHost {
},
TasksService.jobOpts
)
}

return {
...job.data.distributionCompletedResults,
...persistResult
this.logger.log(
`Failed persisting distribution summary [${job.data.stamp}] - ${persistResult}. Retries left ${job.data.retriesLeft - 1}...`
)
return undefined
} else {
return {
...job.data.distributionCompletedResults,
...persistResult
}
}
} catch (err) {
this.logger.error(
Expand Down Expand Up @@ -276,7 +285,7 @@ export class DistributionQueue extends WorkerHost {
)

const data = job.data

if (!data) {
this.logger.error(
'Failed to complete distribution without data'
Expand All @@ -292,7 +301,7 @@ export class DistributionQueue extends WorkerHost {
if (data.retries > 0) {
this.startRecoveryDistribution(
data,
processedScores,
processedScores.concat(data.processed),
failedScores,
)
return undefined
Expand All @@ -312,25 +321,25 @@ export class DistributionQueue extends WorkerHost {
const result = await this.distribution.distribute(data.stamp)

if (!result && data.retries > 0) {
this.tasks.distributionQueue.add(
DistributionQueue.JOB_RETRY_COMPLETE_DISTRIBUTION,
{
this.tasks.distributionFlow.add(
TasksService.RETRY_COMPLETE_DISTRIBUTION_FLOW({
stamp: data.stamp,
total: data.total,
retries: data.retries - 1,
},
TasksService.jobOpts,
processed: processedScores.concat(data.processed),
})
)
}

return {
complete: result,
stamp: data.stamp,
scores: processedScores.map((score) => ({
ator_address: score.address,
fingerprint: score.fingerprint,
score: Number.parseInt(score.score),
})),
this.logger.warn(`Failed to complete distribution ${data.stamp} with ${result}. Retries left ${data.retries - 1}`)
return undefined
} else {
return {
complete: result,
stamp: data.stamp,
scores: processedScores.concat(data.processed).map((score) => ({
ator_address: score.address,
fingerprint: score.fingerprint,
score: Number.parseInt(score.score),
})),
}
}
} catch (e) {
this.logger.error('Exception while completing distribution', e.stack)
Expand Down Expand Up @@ -360,12 +369,13 @@ export class DistributionQueue extends WorkerHost {
})

this.tasks.distributionFlow.add(
TasksService.DISTRIBUTION_FLOW(
data.stamp,
failedScores.length,
data.retries - 1,
scoreJobs,
),
TasksService.DISTRIBUTION_FLOW({
stamp: data.stamp,
total: failedScores.length,
retries: data.retries - 1,
scoreJobs: scoreJobs,
processed: processedScores
}),
)
}

Expand All @@ -381,21 +391,25 @@ export class DistributionQueue extends WorkerHost {
const result = await this.distribution.distribute(data.stamp)

if (!result && data.retries > 0) {
this.tasks.distributionQueue.add(
DistributionQueue.JOB_RETRY_COMPLETE_DISTRIBUTION,
{
this.tasks.distributionFlow.add(
TasksService.RETRY_COMPLETE_DISTRIBUTION_FLOW({
stamp: data.stamp,
total: data.total,
retries: data.retries - 1,
},
TasksService.jobOpts,
processed: data.processed
})
)
}

return {
complete: result,
stamp: data.stamp,
scores: [],
this.logger.warn(`Failed to complete distribution ${data.stamp} with ${result}. Retries left ${data.retries - 1}`)
return undefined
} else {
return {
complete: result,
stamp: data.stamp,
scores: data.processed.map((score) => ({
ator_address: score.address,
fingerprint: score.fingerprint,
score: Number.parseInt(score.score),
})),
}
}
} else {
this.logger.error(
Expand Down
33 changes: 29 additions & 4 deletions src/tasks/tasks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TaskServiceData } from './schemas/task-service-data'
import { InjectModel } from '@nestjs/mongoose'
import { Model, Types } from 'mongoose'
import { ClusterService } from '../cluster/cluster.service'
import { Score } from 'src/distribution/interfaces/distribution'

@Injectable()
export class TasksService implements OnApplicationBootstrap {
Expand Down Expand Up @@ -134,22 +135,25 @@ export class TasksService implements OnApplicationBootstrap {
}
}

public static DISTRIBUTION_FLOW(
public static DISTRIBUTION_FLOW({
stamp, total, retries, scoreJobs, processed
}: {
stamp: number,
total: number,
retries: number,
scoreJobs: ScoreData[][],
): FlowJob {
processed: Score[],
}): FlowJob {
return {
name: 'persist-distribution',
queueName: 'distribution-queue',
opts: TasksService.jobOpts,
data: { stamp, retries },
data: { stamp, retries: 5 },
children: [{
name: 'complete-distribution',
queueName: 'distribution-queue',
opts: TasksService.jobOpts,
data: { stamp, total, retries },
data: { stamp, total, retries, processed },
children: scoreJobs.map((scores, index, array) => ({
name: 'add-scores',
queueName: 'distribution-queue',
Expand All @@ -160,6 +164,27 @@ export class TasksService implements OnApplicationBootstrap {
}
}

public static RETRY_COMPLETE_DISTRIBUTION_FLOW({
stamp, retries, processed
}: {
stamp: number,
retries: number,
processed: Score[],
}): FlowJob {
return {
name: 'persist-distribution',
queueName: 'distribution-queue',
opts: TasksService.jobOpts,
data: { stamp, retries: 5 },
children: [{
name: 'retry-complete-distribution',
queueName: 'distribution-queue',
opts: TasksService.jobOpts,
data: { stamp, retries, processed, total: processed.length }
}]
}
}

constructor(
private readonly config: ConfigService<{
IS_LIVE: string
Expand Down

0 comments on commit 573e780

Please sign in to comment.