-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import { Inject, LifecycleDestroy, LifecycleInit } from '@eggjs/tegg'; | ||
import { MQAdapter } from '../../infra/MQAdapter'; | ||
import { Job, UnrecoverableError, Worker } from 'bullmq'; | ||
import { EggAppConfig, EggLogger } from 'egg'; | ||
import { TaskService } from '../service/TaskService'; | ||
|
||
export abstract class AbstractWorker { | ||
@Inject() | ||
private readonly queueAdapter: MQAdapter; | ||
|
||
@Inject() | ||
private readonly config: EggAppConfig; | ||
|
||
@Inject() | ||
private readonly taskService: TaskService; | ||
|
||
@Inject() | ||
private readonly logger: EggLogger; | ||
|
||
@Inject() | ||
protected worker: Worker; | ||
|
||
queueKey: string; | ||
configKey: string; | ||
service; | ||
|
||
async initWorker() { | ||
throw new Error('should implements in subclass'); | ||
} | ||
|
||
@LifecycleInit() | ||
protected async init() { | ||
this.initWorker(); | ||
const queue = this.queueAdapter.initQueue(this.queueKey); | ||
this.worker = new Worker( | ||
queue.name, | ||
async (job: Job) => { | ||
const startTime = Date.now(); | ||
const task = await this.taskService.findTask(job.data.taskId); | ||
if (!task) { | ||
throw new UnrecoverableError('task not found'); | ||
} | ||
|
||
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`, | ||
this.worker.concurrency, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, | ||
Check failure on line 45 in app/core/woker/AbstractWorker.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 45 in app/core/woker/AbstractWorker.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
|
||
startTime - task.updatedAt.getTime()); | ||
Check failure on line 46 in app/core/woker/AbstractWorker.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 46 in app/core/woker/AbstractWorker.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
|
||
if (this.worker.concurrency !== this.config.cnpmcore[this.configKey]) { | ||
this.worker.concurrency = this.config.cnpmcore[this.configKey]; | ||
} | ||
|
||
// TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout | ||
await this.service.executeTask(job.data); | ||
}, | ||
{ | ||
concurrency: this.config.cnpmcore[this.configKey], | ||
}, | ||
); | ||
|
||
this.worker.on('completed', (job: Job) => { | ||
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms`, | ||
job.data.taskId, job.data.targetName, Date.now() - job.timestamp); | ||
Check failure on line 61 in app/core/woker/AbstractWorker.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 61 in app/core/woker/AbstractWorker.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
|
||
}); | ||
|
||
this.worker.on('failed', (job?: Job) => { | ||
if (!job) { | ||
return; | ||
} | ||
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:failed][%s] taskId: %s, targetName: %s, attemptsMade %s`, | ||
job.data.taskId, job.data.targetName, job.attemptsMade); | ||
}); | ||
|
||
} | ||
|
||
@LifecycleDestroy() | ||
protected async destroy() { | ||
await this.worker.close(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import { Inject } from '@eggjs/tegg'; | ||
import { TaskType } from '../../common/enum/Task'; | ||
import { HookTriggerService } from '../service/HookTriggerService'; | ||
import { AbstractWorker } from './AbstractWorker'; | ||
|
||
export class TriggerHookWorker extends AbstractWorker { | ||
|
||
@Inject() | ||
private readonly hookTriggerService: HookTriggerService; | ||
|
||
async initWoker(): Promise<void> { | ||
this.queueKey = TaskType.TriggerHook; | ||
this.service = this.hookTriggerService; | ||
this.configKey = 'triggerHookWorkerMaxConcurrentTasks'; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import { | ||
AccessLevel, | ||
Inject, | ||
SingletonProto, | ||
} from '@eggjs/tegg'; | ||
import { Redis } from 'ioredis'; | ||
import { JobsOptions, Queue } from 'bullmq'; | ||
import { MQAdapterType } from '../common/typing'; | ||
|
||
/** | ||
* Use sort set to keep queue in order and keep same value only insert once | ||
*/ | ||
@SingletonProto({ | ||
accessLevel: AccessLevel.PUBLIC, | ||
name: 'mqAdapter', | ||
}) | ||
export class MQAdapter implements MQAdapterType { | ||
@Inject() | ||
private readonly redis: Redis; // 由 redis 插件引入 | ||
|
||
private queueMap: Record<string, Queue>; | ||
|
||
private getQueueName(key: string) { | ||
return `CNPMCORE_MQ_V1_${key}`; | ||
} | ||
|
||
initQueue(key: string) { | ||
const queueName = this.getQueueName(key); | ||
if (!this.queueMap[key]) { | ||
this.queueMap[key] = new Queue(queueName, { | ||
connection: this.redis, | ||
}); | ||
} | ||
|
||
return this.queueMap[key]; | ||
} | ||
|
||
/** | ||
* If queue has the same item, return false | ||
* If queue not has the same item, return true | ||
*/ | ||
async addJobs(key: string, taskId: string, options?: JobsOptions): Promise<boolean> { | ||
try { | ||
const queue = this.initQueue(key); | ||
await queue.add(key, { jobId: taskId }, | ||
{ | ||
removeOnComplete: true, | ||
removeOnFail: true, | ||
attempts: 3, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1000, | ||
}, | ||
...options, | ||
}, | ||
); | ||
return true; | ||
} catch (e) { | ||
return false; | ||
} | ||
} | ||
|
||
async pause(key: string) { | ||
await this.initQueue(key).pause(); | ||
} | ||
|
||
async resume(key: string) { | ||
await this.initQueue(key).pause(); | ||
} | ||
|
||
} |
This file was deleted.