diff --git a/app/common/typing.ts b/app/common/typing.ts index 3d65c2aa..4b696384 100644 --- a/app/common/typing.ts +++ b/app/common/typing.ts @@ -50,6 +50,12 @@ export interface QueueAdapter { length(key: string): Promise; } +export interface MQAdapterType { + addJobs(key: string, taskId: string): Promise; + pause(key: string): void; + resume(key: string): void; +} + export interface AuthUrlResult { loginUrl: string; doneUrl: string; diff --git a/app/core/service/CreateHookTriggerService.ts b/app/core/service/CreateHookTriggerService.ts index 6762d0c0..f06ac867 100644 --- a/app/core/service/CreateHookTriggerService.ts +++ b/app/core/service/CreateHookTriggerService.ts @@ -72,7 +72,7 @@ export class CreateHookTriggerService extends AbstractService { private async createTriggerTasks(hooks: Array, hookEvent: HookEvent) { await pMap(hooks, async hook => { const triggerHookTask = Task.createTriggerHookTask(hookEvent, hook.hookId); - await this.taskService.createTask(triggerHookTask, true); + await this.taskService.createTask(triggerHookTask, false, true); }, { concurrency: 5 }); } } diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index 08fa34f3..fb302b51 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -9,6 +9,7 @@ import { AbstractService } from '../../common/AbstractService'; import { TaskRepository } from '../../repository/TaskRepository'; import { Task, CreateSyncPackageTaskData } from '../entity/Task'; import { QueueAdapter } from '../../common/typing'; +import { MQAdapter } from '../../infra/MQAdapter'; @SingletonProto({ accessLevel: AccessLevel.PUBLIC, @@ -21,11 +22,14 @@ export class TaskService extends AbstractService { @Inject() private readonly queueAdapter: QueueAdapter; + @Inject() + private readonly mqAdapter: MQAdapter; + public async getTaskQueueLength(taskType: TaskType) { return await this.queueAdapter.length(taskType); } - public async createTask(task: Task, addTaskQueueOnExists: boolean) { + public async createTask(task: Task, addTaskQueueOnExists: boolean, useMQ = false) { const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type); if (existsTask) { // 如果任务还未被触发,就不继续重复创建 @@ -60,10 +64,15 @@ export class TaskService extends AbstractService { return existsTask; } await this.taskRepository.saveTask(task); - await this.queueAdapter.push(task.type, task.taskId); - const queueLength = await this.getTaskQueueLength(task.type); - this.logger.info('[TaskService.createTask:new] taskType: %s, targetName: %s, taskId: %s, queue size: %s', - task.type, task.targetName, task.taskId, queueLength); + + if (useMQ) { + await this.mqAdapter.addJobs(task.type, task.taskId); + } else { + await this.queueAdapter.push(task.type, task.taskId); + const queueLength = await this.getTaskQueueLength(task.type); + this.logger.info('[TaskService.createTask:new] taskType: %s, targetName: %s, taskId: %s, queue size: %s', + task.type, task.targetName, task.taskId, queueLength); + } return task; } diff --git a/app/core/woker/AbstractWorker.ts b/app/core/woker/AbstractWorker.ts new file mode 100644 index 00000000..16154e1a --- /dev/null +++ b/app/core/woker/AbstractWorker.ts @@ -0,0 +1,79 @@ +import { Inject, LifecycleDestroy, LifecycleInit } from '@eggjs/tegg'; +import { MQAdapter } from '../../infra/MQAdapter'; +import { Job, UnrecoverableError, Worker } from 'bullmq'; +import { EggAppConfig, EggLogger } from 'egg'; +import { TaskService } from '../service/TaskService'; + +export abstract class AbstractWorker { + @Inject() + private readonly queueAdapter: MQAdapter; + + @Inject() + private readonly config: EggAppConfig; + + @Inject() + private readonly taskService: TaskService; + + @Inject() + private readonly logger: EggLogger; + + @Inject() + protected worker: Worker; + + queueKey: string; + configKey: string; + service; + + async initWorker() { + throw new Error('should implements in subclass'); + } + + @LifecycleInit() + protected async init() { + this.initWorker(); + const queue = this.queueAdapter.initQueue(this.queueKey); + this.worker = new Worker( + queue.name, + async (job: Job) => { + const startTime = Date.now(); + const task = await this.taskService.findTask(job.data.taskId); + if (!task) { + throw new UnrecoverableError('task not found'); + } + + this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`, + this.worker.concurrency, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, + startTime - task.updatedAt.getTime()); + if (this.worker.concurrency !== this.config.cnpmcore[this.configKey]) { + this.worker.concurrency = this.config.cnpmcore[this.configKey]; + } + + // TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout + await this.service.executeTask(job.data); + }, + { + concurrency: this.config.cnpmcore[this.configKey], + }, + ); + + this.worker.on('completed', (job: Job) => { + this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms`, + job.data.taskId, job.data.targetName, Date.now() - job.timestamp); + }); + + this.worker.on('failed', (job?: Job) => { + if (!job) { + return; + } + this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:failed][%s] taskId: %s, targetName: %s, attemptsMade %s`, + job.data.taskId, job.data.targetName, job.attemptsMade); + }); + + } + + @LifecycleDestroy() + protected async destroy() { + await this.worker.close(); + } + +} diff --git a/app/core/woker/HookTriggerWorker.ts b/app/core/woker/HookTriggerWorker.ts new file mode 100644 index 00000000..8e340ed0 --- /dev/null +++ b/app/core/woker/HookTriggerWorker.ts @@ -0,0 +1,17 @@ +import { Inject } from '@eggjs/tegg'; +import { TaskType } from '../../common/enum/Task'; +import { HookTriggerService } from '../service/HookTriggerService'; +import { AbstractWorker } from './AbstractWorker'; + +export class HookTriggerWorker extends AbstractWorker { + + @Inject() + private readonly hookTriggerService: HookTriggerService; + + async initWoker(): Promise { + this.queueKey = TaskType.TriggerHook; + this.service = this.hookTriggerService; + this.configKey = 'triggerHookWorkerMaxConcurrentTasks'; + } + +} diff --git a/app/infra/MQAdapter.ts b/app/infra/MQAdapter.ts new file mode 100644 index 00000000..be93d0db --- /dev/null +++ b/app/infra/MQAdapter.ts @@ -0,0 +1,71 @@ +import { + AccessLevel, + Inject, + SingletonProto, +} from '@eggjs/tegg'; +import { Redis } from 'ioredis'; +import { JobsOptions, Queue } from 'bullmq'; +import { MQAdapterType } from '../common/typing'; + +/** + * Use sort set to keep queue in order and keep same value only insert once + */ +@SingletonProto({ + accessLevel: AccessLevel.PUBLIC, + name: 'mqAdapter', +}) +export class MQAdapter implements MQAdapterType { + @Inject() + private readonly redis: Redis; // 由 redis 插件引入 + + private queueMap: Record; + + private getQueueName(key: string) { + return `CNPMCORE_MQ_V1_${key}`; + } + + initQueue(key: string) { + const queueName = this.getQueueName(key); + if (!this.queueMap[key]) { + this.queueMap[key] = new Queue(queueName, { + connection: this.redis, + }); + } + + return this.queueMap[key]; + } + + /** + * If queue has the same item, return false + * If queue not has the same item, return true + */ + async addJobs(key: string, taskId: string, options?: JobsOptions): Promise { + try { + const queue = this.initQueue(key); + await queue.add(key, { jobId: taskId }, + { + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, + ...options, + }, + ); + return true; + } catch (e) { + return false; + } + } + + async pause(key: string) { + await this.initQueue(key).pause(); + } + + async resume(key: string) { + await this.initQueue(key).pause(); + } + +} diff --git a/app/port/schedule/TriggerHookWorker.ts b/app/port/schedule/TriggerHookWorker.ts deleted file mode 100644 index 2e594c4f..00000000 --- a/app/port/schedule/TriggerHookWorker.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { EggAppConfig, EggLogger } from 'egg'; -import { IntervalParams, Schedule, ScheduleType } from '@eggjs/tegg/schedule'; -import { Inject } from '@eggjs/tegg'; -import { HookTriggerService } from '../../core/service/HookTriggerService'; -import { TaskService } from '../../core/service/TaskService'; -import { TaskType } from '../../common/enum/Task'; -import { TriggerHookTask } from '../../core/entity/Task'; - -let executingCount = 0; -@Schedule({ - type: ScheduleType.ALL, - scheduleData: { - interval: 1000, - }, -}) -export class TriggerHookWorker { - @Inject() - private readonly config: EggAppConfig; - - @Inject() - private readonly logger: EggLogger; - - @Inject() - private readonly hookTriggerService: HookTriggerService; - - @Inject() - private readonly taskService: TaskService; - - async subscribe() { - if (executingCount >= this.config.cnpmcore.triggerHookWorkerMaxConcurrentTasks) return; - - executingCount++; - try { - let task = await this.taskService.findExecuteTask(TaskType.TriggerHook) as TriggerHookTask; - while (task) { - const startTime = Date.now(); - this.logger.info('[TriggerHookWorker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms', - executingCount, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, - startTime - task.updatedAt.getTime()); - await this.hookTriggerService.executeTask(task); - const use = Date.now() - startTime; - this.logger.info('[TriggerHookWorker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms', - executingCount, task.taskId, task.targetName, use); - if (executingCount >= this.config.cnpmcore.triggerHookWorkerMaxConcurrentTasks) { - this.logger.info('[TriggerHookWorker:subscribe:executeTask] current sync task count %s, exceed max concurrent tasks %s', - executingCount, this.config.cnpmcore.triggerHookWorkerMaxConcurrentTasks); - break; - } - // try next task - task = await this.taskService.findExecuteTask(TaskType.TriggerHook) as TriggerHookTask; - } - } catch (err) { - this.logger.error('[TriggerHookWorker:subscribe:executeTask:error][%s] %s', executingCount, err); - } finally { - executingCount--; - } - } -} diff --git a/package.json b/package.json index 94b338ad..5ee5e734 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "base-x": "^3.0.9", "base64url": "^3.0.1", "bson-objectid": "^2.0.1", + "bullmq": "^4.2.0", "dayjs": "^1.10.7", "egg": "^3.9.2", "egg-cors": "^2.2.3", diff --git a/test/core/worker/HookTriggerWorker.test.ts b/test/core/worker/HookTriggerWorker.test.ts new file mode 100644 index 00000000..e69de29b