From 748831ff1e5b997e7b5c8af117cfd98ad3d89133 Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Fri, 8 Mar 2024 16:20:30 -0800 Subject: [PATCH 1/4] feat: exit with error when an anchor worker fails to anchor a batch --- src/app.ts | 10 +++--- .../__tests__/task-scheduler-service.test.ts | 26 ++++++---------- src/services/task-scheduler-service.ts | 31 +++++++++++-------- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/app.ts b/src/app.ts index 11be4923e..ad5445d13 100644 --- a/src/app.ts +++ b/src/app.ts @@ -302,10 +302,12 @@ export class CeramicAnchorApp { const success = await anchorService .anchorRequests({ signal: controller.signal }) .catch((error: Error) => { - logger.err(`Error when anchoring: ${error}`) - Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, { - message: error.message.substring(0, 50), - }) + if (!controller.signal.aborted) { + logger.err(`Error when anchoring: ${error}`) + Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, { + message: error.message.substring(0, 50), + }) + } throw error }) diff --git a/src/services/__tests__/task-scheduler-service.test.ts b/src/services/__tests__/task-scheduler-service.test.ts index 4f8eb93e4..773bc52d5 100644 --- a/src/services/__tests__/task-scheduler-service.test.ts +++ b/src/services/__tests__/task-scheduler-service.test.ts @@ -2,6 +2,7 @@ import 'reflect-metadata' import { jest, describe, test, expect } from '@jest/globals' import { TaskSchedulerService } from '../task-scheduler-service.js' import { Utils } from '../../utils.js' +import { TestUtils } from '@ceramicnetwork/common' describe('scheduler service', () => { jest.setTimeout(20000) @@ -33,30 +34,15 @@ describe('scheduler service', () => { // test doesn't complete until 'done()' is called }) - test('will continue if the task fails', (done) => { - const numberOfRunsBeforeDone = 5 + test('will stop if the task fails', (done) => { const task = jest.fn() const testScheduler = new TaskSchedulerService() - const runChecks = () => { - // the task runs once right at the start before running every X seconds - expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1) - Utils.delay(3000).then(() => { - done() - }) - } - let count = 0 task.mockImplementation(async () => { - if (count === numberOfRunsBeforeDone) { - testScheduler.stop() - runChecks() - } - count = count + 1 - // the last two runs will be rejected - if (count > numberOfRunsBeforeDone - 2) { + if (count === 2) { return Promise.reject('test error') } @@ -64,6 +50,12 @@ describe('scheduler service', () => { }) testScheduler.start(task as any, 1000) + // @ts-ignore + testScheduler._subscription?.add(() => { + expect(task.mock.calls.length).toEqual(2) + testScheduler.stop() + done() + }) // test doesn't complete until 'done()' is called }) diff --git a/src/services/task-scheduler-service.ts b/src/services/task-scheduler-service.ts index 14f50a752..6e112e0e3 100644 --- a/src/services/task-scheduler-service.ts +++ b/src/services/task-scheduler-service.ts @@ -1,5 +1,15 @@ import { logger } from '../logger/index.js' -import { catchError, Observable, Subscription, defer, share, timer, expand, concatMap, EMPTY, retry } from 'rxjs' +import { + catchError, + Observable, + Subscription, + defer, + share, + timer, + expand, + concatMap, + EMPTY, +} from 'rxjs' import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' import { METRIC_NAMES } from '../settings.js' @@ -11,12 +21,10 @@ export class TaskSchedulerService { private _controller: AbortController private _subscription?: Subscription - constructor() { this._controller = new AbortController() } - /** * Starts the scheduler which will run the provided task at regular intervals * @param task task to perform regularly with a delay of intervalMS between runs @@ -24,7 +32,11 @@ export class TaskSchedulerService { * @param cbAfterNoOp default undefined. cbAfterNoOp is the callback to run if the task returns false. A task returning false indicates that it did not do anything (no op) * cbAfterNoOp will not be called if the task errors. If cbAfterNoOp is not set then the scheduler will continue to run the task regardless if the task was a no op or not */ - start(task: () => Promise, intervalMS = 60000, cbAfterNoOp?: () => Promise): void { + start( + task: () => Promise, + intervalMS = 60000, + cbAfterNoOp?: () => Promise + ): void { if (this._scheduledTask$) { throw new Error('Task already scheduled') } @@ -34,7 +46,7 @@ export class TaskSchedulerService { return false } - return await task().then(result => result === undefined || result) + return await task().then((result) => result === undefined || result) }).pipe( catchError((err: Error) => { Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1) @@ -45,11 +57,6 @@ export class TaskSchedulerService { } throw err - }), - retry({ - delay: intervalMS, - count: 3, - resetOnSuccess: true, }) ) @@ -63,17 +70,15 @@ export class TaskSchedulerService { logger.imp(`Last run of the task was not successful. Stopping the task scheduler`) return EMPTY } - return timer(intervalMS).pipe(concatMap(() => taskWithRetryOnError$)) }), share() ) - this._subscription = this._scheduledTask$.subscribe({ complete: async () => { if (cbAfterNoOp) await cbAfterNoOp() - } + }, }) } From 95e995b64789b32ab42b3e6a1692501cd848ef42 Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Fri, 8 Mar 2024 16:22:29 -0800 Subject: [PATCH 2/4] chore: remove unused import --- src/services/__tests__/task-scheduler-service.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/services/__tests__/task-scheduler-service.test.ts b/src/services/__tests__/task-scheduler-service.test.ts index 773bc52d5..b6ab0dfc3 100644 --- a/src/services/__tests__/task-scheduler-service.test.ts +++ b/src/services/__tests__/task-scheduler-service.test.ts @@ -2,7 +2,6 @@ import 'reflect-metadata' import { jest, describe, test, expect } from '@jest/globals' import { TaskSchedulerService } from '../task-scheduler-service.js' import { Utils } from '../../utils.js' -import { TestUtils } from '@ceramicnetwork/common' describe('scheduler service', () => { jest.setTimeout(20000) From 765c936b0776f067d6bd6b3e591f69867fb6486e Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Tue, 12 Mar 2024 12:50:07 -0700 Subject: [PATCH 3/4] Fix: tests don't trample over each other --- .../__tests__/task-scheduler-service.test.ts | 55 ++++++++----------- src/services/task-scheduler-service.ts | 9 ++- src/settings.ts | 5 -- 3 files changed, 26 insertions(+), 43 deletions(-) diff --git a/src/services/__tests__/task-scheduler-service.test.ts b/src/services/__tests__/task-scheduler-service.test.ts index b6ab0dfc3..f0b746091 100644 --- a/src/services/__tests__/task-scheduler-service.test.ts +++ b/src/services/__tests__/task-scheduler-service.test.ts @@ -2,38 +2,32 @@ import 'reflect-metadata' import { jest, describe, test, expect } from '@jest/globals' import { TaskSchedulerService } from '../task-scheduler-service.js' import { Utils } from '../../utils.js' +import { TestUtils } from '@ceramicnetwork/common' describe('scheduler service', () => { jest.setTimeout(20000) - test('will run the task repeatedly', (done) => { + test('will run the task repeatedly', async () => { const numberOfRunsBeforeDone = 3 const task = jest.fn() const testScheduler = new TaskSchedulerService() - const runChecks = () => { - // the task runs once right at the start before running every X seconds - expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1) - done() - } - let count = 0 task.mockImplementation(async () => { - if (count === numberOfRunsBeforeDone) { - testScheduler.stop() - runChecks() - } - count = count + 1 return Promise.resolve() }) testScheduler.start(task as any, 1000) - // test doesn't complete until 'done()' is called + await TestUtils.delay(1000 * numberOfRunsBeforeDone) + await testScheduler.stop() + expect(task.mock.calls.length).toBeGreaterThanOrEqual(numberOfRunsBeforeDone) }) - test('will stop if the task fails', (done) => { + test('will stop if the task fails', async () => { + const mockExit = jest.spyOn(process, 'exit').mockImplementation(() => {}) + const task = jest.fn() const testScheduler = new TaskSchedulerService() @@ -42,42 +36,37 @@ describe('scheduler service', () => { count = count + 1 if (count === 2) { - return Promise.reject('test error') + throw Error('test error') } - return Promise.resolve() + return }) testScheduler.start(task as any, 1000) - // @ts-ignore - testScheduler._subscription?.add(() => { - expect(task.mock.calls.length).toEqual(2) - testScheduler.stop() - done() + await TestUtils.waitForConditionOrTimeout(async () => { + // @ts-ignore + return testScheduler._subscription?.closed || false }) - // test doesn't complete until 'done()' is called + expect(mockExit).toHaveBeenCalled() }) - test('Will complete current task if stop is called', (done) => { + test('Will complete current task if stop is called', async () => { let calls = 0 const task = async () => { await Utils.delay(2000) calls = calls + 1 } + const testScheduler = new TaskSchedulerService() + testScheduler.start(task as any, 1000) + await Utils.delay(500) // stop is called during the task // stop should only return once the task completes - Utils.delay(1000).then(async () => { - await testScheduler.stop() - await Utils.delay(3000) - // task should have compelted once - expect(calls).toEqual(1) - done() - }) - - testScheduler.start(task as any, 1000) - // test doesn't complete until 'done()' is called + await testScheduler.stop() + await Utils.delay(3000) + // task should have completed once + expect(calls).toEqual(1) }) test('Will run cbAfterNoOp after failure if set', async () => { diff --git a/src/services/task-scheduler-service.ts b/src/services/task-scheduler-service.ts index 6e112e0e3..e6ef36b3b 100644 --- a/src/services/task-scheduler-service.ts +++ b/src/services/task-scheduler-service.ts @@ -10,8 +10,6 @@ import { concatMap, EMPTY, } from 'rxjs' -import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' -import { METRIC_NAMES } from '../settings.js' /** * Repeatedly triggers a task to be run after a configured amount of ms @@ -49,9 +47,6 @@ export class TaskSchedulerService { return await task().then((result) => result === undefined || result) }).pipe( catchError((err: Error) => { - Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1) - logger.err(`Scheduler task failed: ${err}`) - if (this._controller.signal.aborted) { return EMPTY } @@ -79,6 +74,10 @@ export class TaskSchedulerService { complete: async () => { if (cbAfterNoOp) await cbAfterNoOp() }, + error: (err) => { + logger.err(`Task scheduler exiting because of error: ${err}`) + process.exit(1) + }, }) } diff --git a/src/settings.ts b/src/settings.ts index af217802b..0bc69fd2c 100644 --- a/src/settings.ts +++ b/src/settings.ts @@ -59,10 +59,6 @@ export enum METRIC_NAMES { WITNESS_CAR_CACHE_HIT = 'witness_car_cache_hit', WITNESS_CAR_CACHE_MISS = 'witness_car_cache_miss', - // *******************************************************************// - // Scheduler Service - SCHEDULER_TASK_UNCAUGHT_ERROR = 'scheduler_task_uncaught_error', - // *******************************************************************// // Ceramic Service PIN_SUCCEEDED = 'pin_succeeded', @@ -80,4 +76,3 @@ export enum METRIC_NAMES { REQUEST_NOT_CREATED = 'request_not_created', REQUEST_NOT_FOUND = 'request_not_found', } - From 54e638689f6d9f619310ed03b9d1eb319e53e8ae Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Tue, 12 Mar 2024 13:09:26 -0700 Subject: [PATCH 4/4] chore: lint --- src/services/__tests__/task-scheduler-service.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/services/__tests__/task-scheduler-service.test.ts b/src/services/__tests__/task-scheduler-service.test.ts index f0b746091..23a8e1169 100644 --- a/src/services/__tests__/task-scheduler-service.test.ts +++ b/src/services/__tests__/task-scheduler-service.test.ts @@ -26,7 +26,9 @@ describe('scheduler service', () => { }) test('will stop if the task fails', async () => { - const mockExit = jest.spyOn(process, 'exit').mockImplementation(() => {}) + const mockExit = jest.spyOn(process, 'exit').mockImplementation(() => { + return + }) const task = jest.fn() const testScheduler = new TaskSchedulerService()