-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,84 @@ | ||
import { Inject, LifecycleDestroy, LifecycleInit } from '@eggjs/tegg'; | ||
import { MQAdapter } from '../../infra/MQAdapter'; | ||
import { Job, UnrecoverableError, Worker } from 'bullmq'; | ||
import { EggAppConfig, EggLogger } from 'egg'; | ||
import { Application } from 'egg'; | ||
import { TaskService } from '../service/TaskService'; | ||
|
||
export abstract class AbstractWorker { | ||
@Inject() | ||
private readonly queueAdapter: MQAdapter; | ||
|
||
@Inject() | ||
private readonly config: EggAppConfig; | ||
|
||
@Inject() | ||
private readonly taskService: TaskService; | ||
|
||
@Inject() | ||
private readonly logger: EggLogger; | ||
constructor(app: Application) { | ||
this.app = app; | ||
this.registerWorker(); | ||
} | ||
|
||
protected worker: Worker; | ||
|
||
app: Application; | ||
queueKey: string; | ||
configKey: string; | ||
service; | ||
|
||
queueAdapter: MQAdapter; | ||
taskService: TaskService; | ||
serviceClass: any; | ||
service: any; | ||
|
||
async initWorker() { | ||
throw new Error('should implements in subclass'); | ||
await this.initWorkerInfo(); | ||
this.queueAdapter = await this.app.getEggObject(MQAdapter); | ||
this.taskService = await this.app.getEggObject(TaskService); | ||
this.service = await this.app.getEggObject(this.serviceClass); | ||
} | ||
|
||
@LifecycleInit() | ||
protected async init() { | ||
async initWorkerInfo() { | ||
throw new Error('not implement'); | ||
} | ||
|
||
async registerWorker() { | ||
await this.initWorker(); | ||
const queue = this.queueAdapter.initQueue(this.queueKey); | ||
this.worker = new Worker( | ||
queue.name, | ||
async (job: Job) => { | ||
const startTime = Date.now(); | ||
const task = await this.taskService.findTask(job.data.taskId); | ||
if (!task) { | ||
throw new UnrecoverableError('task not found'); | ||
} | ||
await this.app.runInAnonymousContextScope(async ctx => { | ||
await ctx.beginModuleScope(async () => { | ||
console.log('开始干活了'); | ||
const startTime = Date.now(); | ||
const task = await this.taskService.findTask(job.data.taskId); | ||
if (!task) { | ||
throw new UnrecoverableError('task not found'); | ||
} | ||
|
||
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`, | ||
this.worker.concurrency, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, | ||
startTime - task.updatedAt.getTime()); | ||
if (this.worker.concurrency !== this.config.cnpmcore[this.configKey]) { | ||
this.worker.concurrency = this.config.cnpmcore[this.configKey]; | ||
} | ||
if (this.worker.concurrency !== this.app.config.cnpmcore[this.configKey]) { | ||
this.worker.concurrency = this.app.config.cnpmcore[this.configKey]; | ||
} | ||
this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:start] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`, | ||
task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, | ||
startTime - task.updatedAt.getTime()); | ||
|
||
// TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout | ||
await this.service.executeTask(job.data); | ||
// TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout | ||
console.log('搞'); | ||
await this.service.executeTask(task as any); | ||
console.log('搞好'); | ||
}); | ||
}); | ||
}, | ||
{ | ||
concurrency: this.config.cnpmcore[this.configKey], | ||
autorun: true, | ||
concurrency: this.app.config.cnpmcore[this.configKey], | ||
}, | ||
); | ||
|
||
this.worker.on('completed', (job: Job) => { | ||
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms`, | ||
console.log('干好了'); | ||
this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:success] taskId: %s, targetName: %s, use %sms`, | ||
job.data.taskId, job.data.targetName, Date.now() - job.timestamp); | ||
}); | ||
|
||
this.worker.on('failed', (job?: Job) => { | ||
if (!job) { | ||
return; | ||
} | ||
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:failed][%s] taskId: %s, targetName: %s, attemptsMade %s`, | ||
this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:failed] taskId: %s, targetName: %s, attemptsMade %s`, | ||
job.data.taskId, job.data.targetName, job.attemptsMade); | ||
}); | ||
|
||
} | ||
|
||
@LifecycleDestroy() | ||
protected async destroy() { | ||
await this.worker.close(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,12 @@ | ||
import { Inject, SingletonProto } from '@eggjs/tegg'; | ||
import { TaskType } from '../../common/enum/Task'; | ||
import { HookTriggerService } from '../service/HookTriggerService'; | ||
import { AbstractWorker } from './AbstractWorker'; | ||
|
||
@SingletonProto() | ||
export class HookTriggerWorker extends AbstractWorker { | ||
|
||
@Inject() | ||
private readonly hookTriggerService: HookTriggerService; | ||
|
||
async initWorker(): Promise<void> { | ||
async initWorkerInfo(): Promise<void> { | ||
this.queueKey = TaskType.TriggerHook; | ||
this.service = this.hookTriggerService; | ||
this.serviceClass = HookTriggerService; | ||
this.configKey = 'triggerHookWorkerMaxConcurrentTasks'; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,69 @@ | ||
import { app } from 'egg-mock/bootstrap'; | ||
import { Change } from '../../../app/core/entity/Change'; | ||
import assert from 'assert'; | ||
import { HookTriggerWorker } from '../../../app/core/woker/HookTriggerWorker'; | ||
import { PACKAGE_VERSION_ADDED } from '../../../app/core/event'; | ||
import { ChangeRepository } from '../../../app/repository/ChangeRepository'; | ||
import { HookManageService } from '../../../app/core/service/HookManageService'; | ||
import { HookType } from '../../../app/common/enum/Hook'; | ||
import { HookEvent } from '../../../app/core/entity/HookEvent'; | ||
import { Task } from '../../../app/core/entity/Task'; | ||
import { CreateHookTriggerService } from '../../../app/core/service/CreateHookTriggerService'; | ||
import { TestUtil } from '../../TestUtil'; | ||
import { UserRepository } from '../../../app/repository/UserRepository'; | ||
import { TaskState } from '../../../app/common/enum/Task'; | ||
|
||
describe('test/core/worker/HookTriggerWorker.test.ts', () => { | ||
let hookTriggerWorker: HookTriggerWorker; | ||
|
||
beforeEach(async () => { | ||
hookTriggerWorker = await app.getEggObject(HookTriggerWorker); | ||
}); | ||
describe('trigger hook', () => { | ||
|
||
let change: Change; | ||
let hookManageService: HookManageService; | ||
let taskId: string; | ||
const pkgName = '@cnpmcore/foo'; | ||
beforeEach(async () => { | ||
app.mockLog(); | ||
const { name: username } = await TestUtil.createUser(); | ||
await TestUtil.createPackage({ name: pkgName }); | ||
change = Change.create({ | ||
type: PACKAGE_VERSION_ADDED, | ||
targetName: pkgName, | ||
data: { | ||
version: '1.0.0', | ||
}, | ||
}); | ||
app.mockHttpclient('http://foo.com', 'POST', { | ||
status: 200, | ||
}); | ||
const changeRepository = await app.getEggObject(ChangeRepository); | ||
await changeRepository.addChange(change); | ||
const userRepository = await app.getEggObject(UserRepository); | ||
const user = await userRepository.findUserByName(username); | ||
const userId = user!.userId; | ||
hookManageService = await app.getEggObject(HookManageService); | ||
await hookManageService.createHook({ | ||
type: HookType.Package, | ||
ownerId: userId, | ||
name: pkgName, | ||
endpoint: 'http://foo.com', | ||
secret: 'mock_secret', | ||
}); | ||
|
||
}); | ||
|
||
it('should work', async () => { | ||
|
||
describe('initWorker', () => { | ||
it('should init worker', async () => { | ||
await app.ready(); | ||
assert.equal(hookTriggerWorker.configKey, 'triggerHookWorkerMaxConcurrentTasks'); | ||
assert.equal(hookTriggerWorker.queueKey, 'trigger_hook'); | ||
const task = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, change.changeId, '1.0.0', 'latest')); | ||
taskId = task.taskId; | ||
const createHookTriggerService = await app.getEggObject(CreateHookTriggerService); | ||
await createHookTriggerService.executeTask(task); | ||
assert.equal(task?.state, TaskState.Success); | ||
assert(taskId); | ||
|
||
app.expectLog('trigger_hook_worker:subscribe:executeTask:start'); | ||
app.expectLog('trigger_hook_worker:subscribe:executeTask:success'); | ||
|
||
}); | ||
}); | ||
|
||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import assert from 'assert'; | ||
import { setTimeout } from 'node:timers/promises'; | ||
import { Job, Worker } from 'bullmq'; | ||
import { app } from 'egg-mock/bootstrap'; | ||
import { MQAdapter } from '../../app/infra/MQAdapter'; | ||
|
||
describe('test/infra/MQAdapter.test.ts', () => { | ||
let mqAdapter: MQAdapter; | ||
|
||
beforeEach(async () => { | ||
mqAdapter = await app.getEggObject(MQAdapter); | ||
}); | ||
|
||
it('should remove duplicate task', async () => { | ||
const queue = mqAdapter.initQueue('banana'); | ||
|
||
await mqAdapter.addJobs('banana', {taskId: '1', targetName: 'okk'}); | ||
Check failure on line 17 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 17 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 17 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 17 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 17 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (20, ubuntu-latest)
|
||
await mqAdapter.addJobs('banana', {taskId: '2', targetName: 'okk'}); | ||
Check failure on line 18 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 18 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 18 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 18 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 18 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (20, ubuntu-latest)
|
||
await mqAdapter.addJobs('banana', {taskId: '1', targetName: 'okk'}); | ||
Check failure on line 19 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 19 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 19 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 19 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 19 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (20, ubuntu-latest)
|
||
|
||
const len = await queue.count(); | ||
assert.equal(len, 2); | ||
|
||
}); | ||
|
||
it('should retry failed task', async () => { | ||
|
||
const queue = mqAdapter.initQueue('apple'); | ||
|
||
// retry 1 time; | ||
await mqAdapter.addJobs('apple', {taskId: '3', targetName: 'apple'}, { | ||
Check failure on line 31 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 31 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (16, ubuntu-latest)
Check failure on line 31 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 31 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (18, ubuntu-latest)
Check failure on line 31 in test/infra/MQAdapter.test.ts GitHub Actions / test-mysql57-fs-nfs (20, ubuntu-latest)
|
||
attempts: 2, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1, | ||
}, | ||
}); | ||
|
||
let failed = 0; | ||
|
||
const worker = new Worker(queue.name, async (job: Job) => { | ||
// console.log(job.data); | ||
throw new Error(`${job.data.taskId} error`); | ||
}); | ||
|
||
worker.on('failed', job => { | ||
// console.log('failed', job?.data?.taskId); | ||
job && failed++; | ||
}); | ||
|
||
let len = await queue.count(); | ||
assert.equal(len, 1); | ||
|
||
// retry triggered | ||
await setTimeout(50); | ||
|
||
assert.equal(failed, 2); | ||
len = await queue.count(); | ||
assert.equal(len, 0); | ||
}); | ||
}); |