Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exit with error when an anchor worker fails to anchor a batch #1187

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,12 @@ export class CeramicAnchorApp {
const success = await anchorService
.anchorRequests({ signal: controller.signal })
.catch((error: Error) => {
logger.err(`Error when anchoring: ${error}`)
Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, {
message: error.message.substring(0, 50),
})
if (!controller.signal.aborted) {
logger.err(`Error when anchoring: ${error}`)
Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, {
message: error.message.substring(0, 50),
})
}
throw error
})

Expand Down
70 changes: 26 additions & 44 deletions src/services/__tests__/task-scheduler-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,73 @@ import 'reflect-metadata'
import { jest, describe, test, expect } from '@jest/globals'
import { TaskSchedulerService } from '../task-scheduler-service.js'
import { Utils } from '../../utils.js'
import { TestUtils } from '@ceramicnetwork/common'

describe('scheduler service', () => {
jest.setTimeout(20000)

test('will run the task repeatedly', (done) => {
test('will run the task repeatedly', async () => {
const numberOfRunsBeforeDone = 3

const task = jest.fn()
const testScheduler = new TaskSchedulerService()

const runChecks = () => {
// the task runs once right at the start before running every X seconds
expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1)
done()
}

let count = 0
task.mockImplementation(async () => {
if (count === numberOfRunsBeforeDone) {
testScheduler.stop()
runChecks()
}

count = count + 1
return Promise.resolve()
})

testScheduler.start(task as any, 1000)
// test doesn't complete until 'done()' is called
await TestUtils.delay(1000 * numberOfRunsBeforeDone)
await testScheduler.stop()
expect(task.mock.calls.length).toBeGreaterThanOrEqual(numberOfRunsBeforeDone)
})

test('will continue if the task fails', (done) => {
const numberOfRunsBeforeDone = 5
test('will stop if the task fails', async () => {
const mockExit = jest.spyOn(process, 'exit').mockImplementation(() => {
return
})

const task = jest.fn()
const testScheduler = new TaskSchedulerService()

const runChecks = () => {
// the task runs once right at the start before running every X seconds
expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1)
Utils.delay(3000).then(() => {
done()
})
}

let count = 0
task.mockImplementation(async () => {
if (count === numberOfRunsBeforeDone) {
testScheduler.stop()
runChecks()
}

count = count + 1

// the last two runs will be rejected
if (count > numberOfRunsBeforeDone - 2) {
return Promise.reject('test error')
if (count === 2) {
throw Error('test error')
}

return Promise.resolve()
return
})

testScheduler.start(task as any, 1000)
// test doesn't complete until 'done()' is called
await TestUtils.waitForConditionOrTimeout(async () => {
// @ts-ignore
return testScheduler._subscription?.closed || false
})
expect(mockExit).toHaveBeenCalled()
})

test('Will complete current task if stop is called', (done) => {
test('Will complete current task if stop is called', async () => {
let calls = 0
const task = async () => {
await Utils.delay(2000)
calls = calls + 1
}

const testScheduler = new TaskSchedulerService()

testScheduler.start(task as any, 1000)
await Utils.delay(500)
// stop is called during the task
// stop should only return once the task completes
Utils.delay(1000).then(async () => {
await testScheduler.stop()
await Utils.delay(3000)
// task should have compelted once
expect(calls).toEqual(1)
done()
})

testScheduler.start(task as any, 1000)
// test doesn't complete until 'done()' is called
await testScheduler.stop()
await Utils.delay(3000)
// task should have completed once
expect(calls).toEqual(1)
})

test('Will run cbAfterNoOp after failure if set', async () => {
Expand Down
40 changes: 22 additions & 18 deletions src/services/task-scheduler-service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { logger } from '../logger/index.js'
import { catchError, Observable, Subscription, defer, share, timer, expand, concatMap, EMPTY, retry } from 'rxjs'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'
import { METRIC_NAMES } from '../settings.js'
import {
catchError,
Observable,
Subscription,
defer,
share,
timer,
expand,
concatMap,
EMPTY,
} from 'rxjs'

/**
* Repeatedly triggers a task to be run after a configured amount of ms
Expand All @@ -11,20 +19,22 @@ export class TaskSchedulerService {
private _controller: AbortController
private _subscription?: Subscription


constructor() {
this._controller = new AbortController()
}


/**
* Starts the scheduler which will run the provided task at regular intervals
* @param task task to perform regularly with a delay of intervalMS between runs
* @param intervalMS default: 60000, delay between task runs
* @param cbAfterNoOp default undefined. cbAfterNoOp is the callback to run if the task returns false. A task returning false indicates that it did not do anything (no op)
* cbAfterNoOp will not be called if the task errors. If cbAfterNoOp is not set then the scheduler will continue to run the task regardless if the task was a no op or not
*/
start(task: () => Promise<boolean | void>, intervalMS = 60000, cbAfterNoOp?: () => Promise<void>): void {
start(
task: () => Promise<boolean | void>,
intervalMS = 60000,
cbAfterNoOp?: () => Promise<void>
): void {
if (this._scheduledTask$) {
throw new Error('Task already scheduled')
}
Expand All @@ -34,22 +44,14 @@ export class TaskSchedulerService {
return false
}

return await task().then(result => result === undefined || result)
return await task().then((result) => result === undefined || result)
}).pipe(
catchError((err: Error) => {
Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1)
logger.err(`Scheduler task failed: ${err}`)

if (this._controller.signal.aborted) {
return EMPTY
}

throw err
}),
retry({
delay: intervalMS,
count: 3,
resetOnSuccess: true,
})
)

Expand All @@ -63,17 +65,19 @@ export class TaskSchedulerService {
logger.imp(`Last run of the task was not successful. Stopping the task scheduler`)
return EMPTY
}

return timer(intervalMS).pipe(concatMap(() => taskWithRetryOnError$))
}),
share()
)


this._subscription = this._scheduledTask$.subscribe({
complete: async () => {
if (cbAfterNoOp) await cbAfterNoOp()
}
},
error: (err) => {
logger.err(`Task scheduler exiting because of error: ${err}`)
process.exit(1)
},
})
}

Expand Down
5 changes: 0 additions & 5 deletions src/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ export enum METRIC_NAMES {
WITNESS_CAR_CACHE_HIT = 'witness_car_cache_hit',
WITNESS_CAR_CACHE_MISS = 'witness_car_cache_miss',

// *******************************************************************//
// Scheduler Service
SCHEDULER_TASK_UNCAUGHT_ERROR = 'scheduler_task_uncaught_error',

// *******************************************************************//
// Ceramic Service
PIN_SUCCEEDED = 'pin_succeeded',
Expand All @@ -80,4 +76,3 @@ export enum METRIC_NAMES {
REQUEST_NOT_CREATED = 'request_not_created',
REQUEST_NOT_FOUND = 'request_not_found',
}

Loading