Skip to content

Commit

Permalink
feat: exit with error when an anchor worker fails to anchor a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed Mar 9, 2024
1 parent e39a853 commit 748831f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
10 changes: 6 additions & 4 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,12 @@ export class CeramicAnchorApp {
const success = await anchorService
.anchorRequests({ signal: controller.signal })
.catch((error: Error) => {
logger.err(`Error when anchoring: ${error}`)
Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, {
message: error.message.substring(0, 50),
})
if (!controller.signal.aborted) {
logger.err(`Error when anchoring: ${error}`)
Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, {
message: error.message.substring(0, 50),
})
}
throw error
})

Expand Down
26 changes: 9 additions & 17 deletions src/services/__tests__/task-scheduler-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'reflect-metadata'
import { jest, describe, test, expect } from '@jest/globals'
import { TaskSchedulerService } from '../task-scheduler-service.js'
import { Utils } from '../../utils.js'
import { TestUtils } from '@ceramicnetwork/common'

describe('scheduler service', () => {
jest.setTimeout(20000)
Expand Down Expand Up @@ -33,37 +34,28 @@ describe('scheduler service', () => {
// test doesn't complete until 'done()' is called
})

test('will continue if the task fails', (done) => {
const numberOfRunsBeforeDone = 5
test('will stop if the task fails', (done) => {
const task = jest.fn()
const testScheduler = new TaskSchedulerService()

const runChecks = () => {
// the task runs once right at the start before running every X seconds
expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1)
Utils.delay(3000).then(() => {
done()
})
}

let count = 0
task.mockImplementation(async () => {
if (count === numberOfRunsBeforeDone) {
testScheduler.stop()
runChecks()
}

count = count + 1

// the last two runs will be rejected
if (count > numberOfRunsBeforeDone - 2) {
if (count === 2) {
return Promise.reject('test error')
}

return Promise.resolve()
})

testScheduler.start(task as any, 1000)
// @ts-ignore
testScheduler._subscription?.add(() => {
expect(task.mock.calls.length).toEqual(2)
testScheduler.stop()
done()
})
// test doesn't complete until 'done()' is called
})

Expand Down
31 changes: 18 additions & 13 deletions src/services/task-scheduler-service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { logger } from '../logger/index.js'
import { catchError, Observable, Subscription, defer, share, timer, expand, concatMap, EMPTY, retry } from 'rxjs'
import {
catchError,
Observable,
Subscription,
defer,
share,
timer,
expand,
concatMap,
EMPTY,
} from 'rxjs'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'
import { METRIC_NAMES } from '../settings.js'

Expand All @@ -11,20 +21,22 @@ export class TaskSchedulerService {
private _controller: AbortController
private _subscription?: Subscription


constructor() {
this._controller = new AbortController()
}


/**
* Starts the scheduler which will run the provided task at regular intervals
* @param task task to perform regularly with a delay of intervalMS between runs
* @param intervalMS default: 60000, delay between task runs
* @param cbAfterNoOp default undefined. cbAfterNoOp is the callback to run if the task returns false. A task returning false indicates that it did not do anything (no op)
* cbAfterNoOp will not be called if the task errors. If cbAfterNoOp is not set then the scheduler will continue to run the task regardless if the task was a no op or not
*/
start(task: () => Promise<boolean | void>, intervalMS = 60000, cbAfterNoOp?: () => Promise<void>): void {
start(
task: () => Promise<boolean | void>,
intervalMS = 60000,
cbAfterNoOp?: () => Promise<void>
): void {
if (this._scheduledTask$) {
throw new Error('Task already scheduled')
}
Expand All @@ -34,7 +46,7 @@ export class TaskSchedulerService {
return false
}

return await task().then(result => result === undefined || result)
return await task().then((result) => result === undefined || result)
}).pipe(
catchError((err: Error) => {
Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1)
Expand All @@ -45,11 +57,6 @@ export class TaskSchedulerService {
}

throw err
}),
retry({
delay: intervalMS,
count: 3,
resetOnSuccess: true,
})
)

Expand All @@ -63,17 +70,15 @@ export class TaskSchedulerService {
logger.imp(`Last run of the task was not successful. Stopping the task scheduler`)
return EMPTY
}

return timer(intervalMS).pipe(concatMap(() => taskWithRetryOnError$))
}),
share()
)


this._subscription = this._scheduledTask$.subscribe({
complete: async () => {
if (cbAfterNoOp) await cbAfterNoOp()
}
},
})
}

Expand Down

0 comments on commit 748831f

Please sign in to comment.