From beef06b12dd6bd3fec12da91297bae635cc8318c Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Tue, 4 Jul 2023 16:55:26 +0800 Subject: [PATCH] feat: add testcase --- app.ts | 6 ++ app/common/typing.ts | 7 +- app/core/service/CreateHookTriggerService.ts | 2 +- app/core/service/HookTriggerService.ts | 2 +- app/core/service/TaskService.ts | 2 +- app/core/woker/AbstractWorker.ts | 83 +++++++++++--------- app/core/woker/HookTriggerWorker.ts | 10 +-- app/infra/MQAdapter.ts | 8 +- test/core/worker/HookTriggerWorker.test.ts | 68 +++++++++++++--- test/infra/MQAdapter.test.ts | 61 ++++++++++++++ 10 files changed, 186 insertions(+), 63 deletions(-) create mode 100644 test/infra/MQAdapter.test.ts diff --git a/app.ts b/app.ts index fbacfb46..11173224 100644 --- a/app.ts +++ b/app.ts @@ -2,6 +2,7 @@ import path from 'path'; import { readFile } from 'fs/promises'; import { Application } from 'egg'; import { ChangesStreamService } from './app/core/service/ChangesStreamService'; +import { HookTriggerWorker } from './app/core/woker/HookTriggerWorker'; declare module 'egg' { interface Application { binaryHTML: string; @@ -26,6 +27,7 @@ export default class CnpmcoreAppHook { app.getLogger('sqlLogger').info('[%s] %s', duration, sql); }, }; + } // https://eggjs.org/zh-cn/basics/app-start.html @@ -34,6 +36,10 @@ export default class CnpmcoreAppHook { const filepath = path.join(this.app.baseDir, 'app/port/binary.html'); const text = await readFile(filepath, 'utf-8'); this.app.binaryHTML = text.replace('{{registry}}', this.app.config.cnpmcore.registry); + + // 由于 bullmq 内使用了异步 Worker 消费任务,脱离了 egg ctx 生命周期 + // 需要手动初始化,内部使用 getEggObject 获取 egg 对象 + new HookTriggerWorker(this.app); } // 应用退出时执行 diff --git a/app/common/typing.ts b/app/common/typing.ts index 4b696384..dc2c4520 100644 --- a/app/common/typing.ts +++ b/app/common/typing.ts @@ -50,8 +50,13 @@ export interface QueueAdapter { length(key: string): Promise; } +export type JobData = { + taskId: string; + targetName: string; +}; + export interface MQAdapterType { - addJobs(key: string, taskId: string): Promise; + addJobs(key: string, data: JobData): Promise; pause(key: string): void; resume(key: string): void; } diff --git a/app/core/service/CreateHookTriggerService.ts b/app/core/service/CreateHookTriggerService.ts index f06ac867..11ad41fb 100644 --- a/app/core/service/CreateHookTriggerService.ts +++ b/app/core/service/CreateHookTriggerService.ts @@ -55,7 +55,7 @@ export class CreateHookTriggerService extends AbstractService { } catch (e) { e.message = 'create trigger failed: ' + e.message; await this.taskService.finishTask(task, TaskState.Fail, `[${isoNow()}][Hooks] ${e.stack} \n`); - return; + throw e; } } diff --git a/app/core/service/HookTriggerService.ts b/app/core/service/HookTriggerService.ts index c6d88571..2f7475ac 100644 --- a/app/core/service/HookTriggerService.ts +++ b/app/core/service/HookTriggerService.ts @@ -56,7 +56,7 @@ export class HookTriggerService { e.message = 'trigger hook failed: ' + e.message; task.error = e.message; await this.taskService.finishTask(task, TaskState.Fail, `[${isoNow()}][TriggerHooks] ${e.stack} \n`); - return; + throw e; } } diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index fb302b51..c7e3a488 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -66,7 +66,7 @@ export class TaskService extends AbstractService { await this.taskRepository.saveTask(task); if (useMQ) { - await this.mqAdapter.addJobs(task.type, task.taskId); + await this.mqAdapter.addJobs(task.type, task); } else { await this.queueAdapter.push(task.type, task.taskId); const queueLength = await this.getTaskQueueLength(task.type); diff --git a/app/core/woker/AbstractWorker.ts b/app/core/woker/AbstractWorker.ts index 3d826dff..b5c703ec 100644 --- a/app/core/woker/AbstractWorker.ts +++ b/app/core/woker/AbstractWorker.ts @@ -1,63 +1,73 @@ -import { Inject, LifecycleDestroy, LifecycleInit } from '@eggjs/tegg'; import { MQAdapter } from '../../infra/MQAdapter'; import { Job, UnrecoverableError, Worker } from 'bullmq'; -import { EggAppConfig, EggLogger } from 'egg'; +import { Application } from 'egg'; import { TaskService } from '../service/TaskService'; export abstract class AbstractWorker { - @Inject() - private readonly queueAdapter: MQAdapter; - - @Inject() - private readonly config: EggAppConfig; - - @Inject() - private readonly taskService: TaskService; - - @Inject() - private readonly logger: EggLogger; + constructor(app: Application) { + this.app = app; + this.registerWorker(); + } protected worker: Worker; + app: Application; queueKey: string; configKey: string; - service; + + queueAdapter: MQAdapter; + taskService: TaskService; + serviceClass: any; + service: any; async initWorker() { - throw new Error('should implements in subclass'); + await this.initWorkerInfo(); + this.queueAdapter = await this.app.getEggObject(MQAdapter); + this.taskService = await this.app.getEggObject(TaskService); + this.service = await this.app.getEggObject(this.serviceClass); } - @LifecycleInit() - protected async init() { + async initWorkerInfo() { + throw new Error('not implement'); + } + + async registerWorker() { await this.initWorker(); const queue = this.queueAdapter.initQueue(this.queueKey); this.worker = new Worker( queue.name, async (job: Job) => { - const startTime = Date.now(); - const task = await this.taskService.findTask(job.data.taskId); - if (!task) { - throw new UnrecoverableError('task not found'); - } + await this.app.runInAnonymousContextScope(async ctx => { + await ctx.beginModuleScope(async () => { + console.log('开始干活了'); + const startTime = Date.now(); + const task = await this.taskService.findTask(job.data.taskId); + if (!task) { + throw new UnrecoverableError('task not found'); + } - this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`, - this.worker.concurrency, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, - startTime - task.updatedAt.getTime()); - if (this.worker.concurrency !== this.config.cnpmcore[this.configKey]) { - this.worker.concurrency = this.config.cnpmcore[this.configKey]; - } + if (this.worker.concurrency !== this.app.config.cnpmcore[this.configKey]) { + this.worker.concurrency = this.app.config.cnpmcore[this.configKey]; + } + this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:start] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`, + task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, + startTime - task.updatedAt.getTime()); - // TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout - await this.service.executeTask(job.data); + // TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout + console.log('搞'); + await this.service.executeTask(task as any); + console.log('搞好'); + }); + }); }, { - concurrency: this.config.cnpmcore[this.configKey], - autorun: true, + concurrency: this.app.config.cnpmcore[this.configKey], }, ); this.worker.on('completed', (job: Job) => { - this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms`, + console.log('干好了'); + this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:success] taskId: %s, targetName: %s, use %sms`, job.data.taskId, job.data.targetName, Date.now() - job.timestamp); }); @@ -65,15 +75,10 @@ export abstract class AbstractWorker { if (!job) { return; } - this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:failed][%s] taskId: %s, targetName: %s, attemptsMade %s`, + this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:failed] taskId: %s, targetName: %s, attemptsMade %s`, job.data.taskId, job.data.targetName, job.attemptsMade); }); } - @LifecycleDestroy() - protected async destroy() { - await this.worker.close(); - } - } diff --git a/app/core/woker/HookTriggerWorker.ts b/app/core/woker/HookTriggerWorker.ts index 7b0fb43d..440b72f3 100644 --- a/app/core/woker/HookTriggerWorker.ts +++ b/app/core/woker/HookTriggerWorker.ts @@ -1,18 +1,12 @@ -import { Inject, SingletonProto } from '@eggjs/tegg'; import { TaskType } from '../../common/enum/Task'; import { HookTriggerService } from '../service/HookTriggerService'; import { AbstractWorker } from './AbstractWorker'; -@SingletonProto() export class HookTriggerWorker extends AbstractWorker { - @Inject() - private readonly hookTriggerService: HookTriggerService; - - async initWorker(): Promise { + async initWorkerInfo(): Promise { this.queueKey = TaskType.TriggerHook; - this.service = this.hookTriggerService; + this.serviceClass = HookTriggerService; this.configKey = 'triggerHookWorkerMaxConcurrentTasks'; } - } diff --git a/app/infra/MQAdapter.ts b/app/infra/MQAdapter.ts index 31ff8cd4..a869a1b8 100644 --- a/app/infra/MQAdapter.ts +++ b/app/infra/MQAdapter.ts @@ -5,7 +5,7 @@ import { } from '@eggjs/tegg'; import { Redis } from 'ioredis'; import { JobsOptions, Queue } from 'bullmq'; -import { MQAdapterType } from '../common/typing'; +import { JobData, MQAdapterType } from '../common/typing'; /** * Use sort set to keep queue in order and keep same value only insert once @@ -39,10 +39,10 @@ export class MQAdapter implements MQAdapterType { * If queue has the same item, return false * If queue not has the same item, return true */ - async addJobs(key: string, taskId: string, options?: JobsOptions): Promise { + async addJobs(key: string, {taskId, targetName} : JobData, options?: JobsOptions): Promise { try { const queue = this.initQueue(key); - await queue.add(key, { jobId: taskId }, + await queue.add(key, { taskId, targetName }, { removeOnComplete: true, removeOnFail: true, @@ -51,6 +51,8 @@ export class MQAdapter implements MQAdapterType { type: 'exponential', delay: 1000, }, + // remove duplicate job + jobId: taskId, ...options, }, ); diff --git a/test/core/worker/HookTriggerWorker.test.ts b/test/core/worker/HookTriggerWorker.test.ts index 1218aa32..b6791b0b 100644 --- a/test/core/worker/HookTriggerWorker.test.ts +++ b/test/core/worker/HookTriggerWorker.test.ts @@ -1,19 +1,69 @@ import { app } from 'egg-mock/bootstrap'; +import { Change } from '../../../app/core/entity/Change'; import assert from 'assert'; -import { HookTriggerWorker } from '../../../app/core/woker/HookTriggerWorker'; +import { PACKAGE_VERSION_ADDED } from '../../../app/core/event'; +import { ChangeRepository } from '../../../app/repository/ChangeRepository'; +import { HookManageService } from '../../../app/core/service/HookManageService'; +import { HookType } from '../../../app/common/enum/Hook'; +import { HookEvent } from '../../../app/core/entity/HookEvent'; +import { Task } from '../../../app/core/entity/Task'; +import { CreateHookTriggerService } from '../../../app/core/service/CreateHookTriggerService'; +import { TestUtil } from '../../TestUtil'; +import { UserRepository } from '../../../app/repository/UserRepository'; +import { TaskState } from '../../../app/common/enum/Task'; describe('test/core/worker/HookTriggerWorker.test.ts', () => { - let hookTriggerWorker: HookTriggerWorker; - beforeEach(async () => { - hookTriggerWorker = await app.getEggObject(HookTriggerWorker); - }); + describe('trigger hook', () => { + + let change: Change; + let hookManageService: HookManageService; + let taskId: string; + const pkgName = '@cnpmcore/foo'; + beforeEach(async () => { + app.mockLog(); + const { name: username } = await TestUtil.createUser(); + await TestUtil.createPackage({ name: pkgName }); + change = Change.create({ + type: PACKAGE_VERSION_ADDED, + targetName: pkgName, + data: { + version: '1.0.0', + }, + }); + app.mockHttpclient('http://foo.com', 'POST', { + status: 200, + }); + const changeRepository = await app.getEggObject(ChangeRepository); + await changeRepository.addChange(change); + const userRepository = await app.getEggObject(UserRepository); + const user = await userRepository.findUserByName(username); + const userId = user!.userId; + hookManageService = await app.getEggObject(HookManageService); + await hookManageService.createHook({ + type: HookType.Package, + ownerId: userId, + name: pkgName, + endpoint: 'http://foo.com', + secret: 'mock_secret', + }); + + }); + + it('should work', async () => { - describe('initWorker', () => { - it('should init worker', async () => { await app.ready(); - assert.equal(hookTriggerWorker.configKey, 'triggerHookWorkerMaxConcurrentTasks'); - assert.equal(hookTriggerWorker.queueKey, 'trigger_hook'); + const task = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, change.changeId, '1.0.0', 'latest')); + taskId = task.taskId; + const createHookTriggerService = await app.getEggObject(CreateHookTriggerService); + await createHookTriggerService.executeTask(task); + assert.equal(task?.state, TaskState.Success); + assert(taskId); + + app.expectLog('trigger_hook_worker:subscribe:executeTask:start'); + app.expectLog('trigger_hook_worker:subscribe:executeTask:success'); + }); }); + }); diff --git a/test/infra/MQAdapter.test.ts b/test/infra/MQAdapter.test.ts new file mode 100644 index 00000000..c9a49240 --- /dev/null +++ b/test/infra/MQAdapter.test.ts @@ -0,0 +1,61 @@ +import assert from 'assert'; +import { setTimeout } from 'node:timers/promises'; +import { Job, Worker } from 'bullmq'; +import { app } from 'egg-mock/bootstrap'; +import { MQAdapter } from '../../app/infra/MQAdapter'; + +describe('test/infra/MQAdapter.test.ts', () => { + let mqAdapter: MQAdapter; + + beforeEach(async () => { + mqAdapter = await app.getEggObject(MQAdapter); + }); + + it('should remove duplicate task', async () => { + const queue = mqAdapter.initQueue('banana'); + + await mqAdapter.addJobs('banana', {taskId: '1', targetName: 'okk'}); + await mqAdapter.addJobs('banana', {taskId: '2', targetName: 'okk'}); + await mqAdapter.addJobs('banana', {taskId: '1', targetName: 'okk'}); + + const len = await queue.count(); + assert.equal(len, 2); + + }); + + it('should retry failed task', async () => { + + const queue = mqAdapter.initQueue('apple'); + + // retry 1 time; + await mqAdapter.addJobs('apple', {taskId: '3', targetName: 'apple'}, { + attempts: 2, + backoff: { + type: 'exponential', + delay: 1, + }, + }); + + let failed = 0; + + const worker = new Worker(queue.name, async (job: Job) => { + // console.log(job.data); + throw new Error(`${job.data.taskId} error`); + }); + + worker.on('failed', job => { + // console.log('failed', job?.data?.taskId); + job && failed++; + }); + + let len = await queue.count(); + assert.equal(len, 1); + + // retry triggered + await setTimeout(50); + + assert.equal(failed, 2); + len = await queue.count(); + assert.equal(len, 0); + }); +});