Skip to content

Commit

Permalink
feat: exit with error when an anchor worker fails to anchor a batch (#…
Browse files Browse the repository at this point in the history
…1187)

* feat: exit with error when an anchor worker fails to anchor a batch

* chore: remove unused import

* fix: tests don't trample over each other

* chore: lint
  • Loading branch information
stephhuynh18 authored Mar 12, 2024
1 parent e39a853 commit 14771a4
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 71 deletions.
10 changes: 6 additions & 4 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,12 @@ export class CeramicAnchorApp {
const success = await anchorService
.anchorRequests({ signal: controller.signal })
.catch((error: Error) => {
logger.err(`Error when anchoring: ${error}`)
Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, {
message: error.message.substring(0, 50),
})
if (!controller.signal.aborted) {
logger.err(`Error when anchoring: ${error}`)
Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, {
message: error.message.substring(0, 50),
})
}
throw error
})

Expand Down
70 changes: 26 additions & 44 deletions src/services/__tests__/task-scheduler-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,73 @@ import 'reflect-metadata'
import { jest, describe, test, expect } from '@jest/globals'
import { TaskSchedulerService } from '../task-scheduler-service.js'
import { Utils } from '../../utils.js'
import { TestUtils } from '@ceramicnetwork/common'

describe('scheduler service', () => {
jest.setTimeout(20000)

test('will run the task repeatedly', (done) => {
test('will run the task repeatedly', async () => {
const numberOfRunsBeforeDone = 3

const task = jest.fn()
const testScheduler = new TaskSchedulerService()

const runChecks = () => {
// the task runs once right at the start before running every X seconds
expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1)
done()
}

let count = 0
task.mockImplementation(async () => {
if (count === numberOfRunsBeforeDone) {
testScheduler.stop()
runChecks()
}

count = count + 1
return Promise.resolve()
})

testScheduler.start(task as any, 1000)
// test doesn't complete until 'done()' is called
await TestUtils.delay(1000 * numberOfRunsBeforeDone)
await testScheduler.stop()
expect(task.mock.calls.length).toBeGreaterThanOrEqual(numberOfRunsBeforeDone)
})

test('will continue if the task fails', (done) => {
const numberOfRunsBeforeDone = 5
test('will stop if the task fails', async () => {
const mockExit = jest.spyOn(process, 'exit').mockImplementation(() => {
return
})

const task = jest.fn()
const testScheduler = new TaskSchedulerService()

const runChecks = () => {
// the task runs once right at the start before running every X seconds
expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1)
Utils.delay(3000).then(() => {
done()
})
}

let count = 0
task.mockImplementation(async () => {
if (count === numberOfRunsBeforeDone) {
testScheduler.stop()
runChecks()
}

count = count + 1

// the last two runs will be rejected
if (count > numberOfRunsBeforeDone - 2) {
return Promise.reject('test error')
if (count === 2) {
throw Error('test error')
}

return Promise.resolve()
return
})

testScheduler.start(task as any, 1000)
// test doesn't complete until 'done()' is called
await TestUtils.waitForConditionOrTimeout(async () => {
// @ts-ignore
return testScheduler._subscription?.closed || false
})
expect(mockExit).toHaveBeenCalled()
})

test('Will complete current task if stop is called', (done) => {
test('Will complete current task if stop is called', async () => {
let calls = 0
const task = async () => {
await Utils.delay(2000)
calls = calls + 1
}

const testScheduler = new TaskSchedulerService()

testScheduler.start(task as any, 1000)
await Utils.delay(500)
// stop is called during the task
// stop should only return once the task completes
Utils.delay(1000).then(async () => {
await testScheduler.stop()
await Utils.delay(3000)
// task should have compelted once
expect(calls).toEqual(1)
done()
})

testScheduler.start(task as any, 1000)
// test doesn't complete until 'done()' is called
await testScheduler.stop()
await Utils.delay(3000)
// task should have completed once
expect(calls).toEqual(1)
})

test('Will run cbAfterNoOp after failure if set', async () => {
Expand Down
40 changes: 22 additions & 18 deletions src/services/task-scheduler-service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { logger } from '../logger/index.js'
import { catchError, Observable, Subscription, defer, share, timer, expand, concatMap, EMPTY, retry } from 'rxjs'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'
import { METRIC_NAMES } from '../settings.js'
import {
catchError,
Observable,
Subscription,
defer,
share,
timer,
expand,
concatMap,
EMPTY,
} from 'rxjs'

/**
* Repeatedly triggers a task to be run after a configured amount of ms
Expand All @@ -11,20 +19,22 @@ export class TaskSchedulerService {
private _controller: AbortController
private _subscription?: Subscription


constructor() {
this._controller = new AbortController()
}


/**
* Starts the scheduler which will run the provided task at regular intervals
* @param task task to perform regularly with a delay of intervalMS between runs
* @param intervalMS default: 60000, delay between task runs
* @param cbAfterNoOp default undefined. cbAfterNoOp is the callback to run if the task returns false. A task returning false indicates that it did not do anything (no op)
* cbAfterNoOp will not be called if the task errors. If cbAfterNoOp is not set then the scheduler will continue to run the task regardless if the task was a no op or not
*/
start(task: () => Promise<boolean | void>, intervalMS = 60000, cbAfterNoOp?: () => Promise<void>): void {
start(
task: () => Promise<boolean | void>,
intervalMS = 60000,
cbAfterNoOp?: () => Promise<void>
): void {
if (this._scheduledTask$) {
throw new Error('Task already scheduled')
}
Expand All @@ -34,22 +44,14 @@ export class TaskSchedulerService {
return false
}

return await task().then(result => result === undefined || result)
return await task().then((result) => result === undefined || result)
}).pipe(
catchError((err: Error) => {
Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1)
logger.err(`Scheduler task failed: ${err}`)

if (this._controller.signal.aborted) {
return EMPTY
}

throw err
}),
retry({
delay: intervalMS,
count: 3,
resetOnSuccess: true,
})
)

Expand All @@ -63,17 +65,19 @@ export class TaskSchedulerService {
logger.imp(`Last run of the task was not successful. Stopping the task scheduler`)
return EMPTY
}

return timer(intervalMS).pipe(concatMap(() => taskWithRetryOnError$))
}),
share()
)


this._subscription = this._scheduledTask$.subscribe({
complete: async () => {
if (cbAfterNoOp) await cbAfterNoOp()
}
},
error: (err) => {
logger.err(`Task scheduler exiting because of error: ${err}`)
process.exit(1)
},
})
}

Expand Down
5 changes: 0 additions & 5 deletions src/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ export enum METRIC_NAMES {
WITNESS_CAR_CACHE_HIT = 'witness_car_cache_hit',
WITNESS_CAR_CACHE_MISS = 'witness_car_cache_miss',

// *******************************************************************//
// Scheduler Service
SCHEDULER_TASK_UNCAUGHT_ERROR = 'scheduler_task_uncaught_error',

// *******************************************************************//
// Ceramic Service
PIN_SUCCEEDED = 'pin_succeeded',
Expand All @@ -80,4 +76,3 @@ export enum METRIC_NAMES {
REQUEST_NOT_CREATED = 'request_not_created',
REQUEST_NOT_FOUND = 'request_not_found',
}

0 comments on commit 14771a4

Please sign in to comment.