From fb006e32fab733e45759459861a5e6974921f376 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Mon, 22 Jan 2024 21:00:57 +0800 Subject: [PATCH 1/2] fix: hooks --- app/core/service/TaskService.ts | 6 ++-- test/core/service/HookTriggerService.test.ts | 36 ++++++++++++++++---- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index 08fa34f3..dd626b0c 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -27,8 +27,10 @@ export class TaskService extends AbstractService { public async createTask(task: Task, addTaskQueueOnExists: boolean) { const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type); - if (existsTask) { - // 如果任务还未被触发,就不继续重复创建 + + // 只在包同步场景下做任务合并,其余场景通过 bizId 来进行任务幂等 + if (existsTask && [ TaskType.SyncPackage, TaskType.SyncBinary ].includes(task.type)) { + // 在包同步场景,如果任务还未被触发,就不继续重复创建 // 如果任务正在执行,可能任务状态已更新,这种情况需要继续创建 if (existsTask.state === TaskState.Waiting) { if (task.type === TaskType.SyncPackage) { diff --git a/test/core/service/HookTriggerService.test.ts b/test/core/service/HookTriggerService.test.ts index 83570501..19b1f650 100644 --- a/test/core/service/HookTriggerService.test.ts +++ b/test/core/service/HookTriggerService.test.ts @@ -5,7 +5,7 @@ import { TestUtil } from '../../../test/TestUtil'; import { HookManageService } from '../../../app/core/service/HookManageService'; import { HookType } from '../../../app/common/enum/Hook'; import { UserRepository } from '../../../app/repository/UserRepository'; -import { PACKAGE_VERSION_ADDED } from '../../../app/core/event'; +import { PACKAGE_TAG_ADDED, PACKAGE_VERSION_ADDED } from '../../../app/core/event'; import { Change } from '../../../app/core/entity/Change'; import { ChangeRepository } from '../../../app/repository/ChangeRepository'; import { Task, TriggerHookTask } from '../../../app/core/entity/Task'; @@ -42,20 +42,32 @@ describe('test/core/service/HookTriggerService.test.ts', () => { }); describe('executeTask', () => { - let change: Change; + let versionChange: Change; + let tagChange: Change; let hook: Hook; let callEndpoint: string; let callOptions: HttpClientRequestOptions; beforeEach(async () => { - change = Change.create({ + versionChange = Change.create({ + type: PACKAGE_TAG_ADDED, + targetName: pkgName, + data: { + tag: 'latest', + }, + }); + tagChange = Change.create({ type: PACKAGE_VERSION_ADDED, targetName: pkgName, data: { version: '1.0.0', }, }); - await changeRepository.addChange(change); + await Promise.all([ + changeRepository.addChange(versionChange), + changeRepository.addChange(tagChange), + ]); + hook = await hookManageService.createHook({ type: HookType.Package, ownerId: userId, @@ -63,8 +75,13 @@ describe('test/core/service/HookTriggerService.test.ts', () => { endpoint: 'http://foo.com', secret: 'mock_secret', }); - const task = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, change.changeId, '1.0.0', 'latest')); - await createHookTriggerService.executeTask(task); + const versionTask = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, versionChange.changeId, '1.0.0', 'latest')); + const tagTask = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, tagChange.changeId, '1.0.0', 'latest')); + + await Promise.all([ + createHookTriggerService.executeTask(versionTask), + createHookTriggerService.executeTask(tagTask), + ]); mock(app.httpclient, 'request', async (url, options) => { callEndpoint = url; @@ -76,7 +93,7 @@ describe('test/core/service/HookTriggerService.test.ts', () => { }); it('should execute trigger', async () => { - const pushTask = await taskRepository.findTaskByBizId(`TriggerHook:${change.changeId}:${hook.hookId}`) as TriggerHookTask; + const pushTask = await taskRepository.findTaskByBizId(`TriggerHook:${versionChange.changeId}:${hook.hookId}`) as TriggerHookTask; await hookTriggerService.executeTask(pushTask); assert(callEndpoint === hook.endpoint); assert(callOptions); @@ -97,5 +114,10 @@ describe('test/core/service/HookTriggerService.test.ts', () => { }); assert(data.time === pushTask.data.hookEvent.time); }); + + it('should create each event', async () => { + const tasks = await Promise.all([ taskRepository.findTaskByBizId(`TriggerHook:${versionChange.changeId}:${hook.hookId}`), taskRepository.findTaskByBizId(`TriggerHook:${tagChange.changeId}:${hook.hookId}`) ]); + assert.equal(tasks.filter(Boolean).length, 2); + }); }); }); From 351d856af173e278ef21c6fcd490cb32afe7393f Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Mon, 22 Jan 2024 21:52:49 +0800 Subject: [PATCH 2/2] feat: clean binary merge logic --- app/core/entity/Task.ts | 4 ++++ app/core/service/BinarySyncerService.ts | 9 --------- app/core/service/TaskService.ts | 2 +- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/app/core/entity/Task.ts b/app/core/entity/Task.ts index 242e6998..43280513 100644 --- a/app/core/entity/Task.ts +++ b/app/core/entity/Task.ts @@ -231,6 +231,10 @@ export class Task extends Entity { return task; } + public static needMergeWhenWaiting(type: TaskType) { + return [ TaskType.SyncBinary, TaskType.SyncPackage ].includes(type); + } + start(): TaskUpdateCondition { const condition = { taskId: this.taskId, diff --git a/app/core/service/BinarySyncerService.ts b/app/core/service/BinarySyncerService.ts index c2f492d9..00d79a49 100644 --- a/app/core/service/BinarySyncerService.ts +++ b/app/core/service/BinarySyncerService.ts @@ -19,7 +19,6 @@ import { Binary } from '../entity/Binary'; import { TaskService } from './TaskService'; import { AbstractBinary, BinaryItem } from '../../common/adapter/binary/AbstractBinary'; import { AbstractService } from '../../common/AbstractService'; -import { TaskRepository } from '../../repository/TaskRepository'; import { BinaryType } from '../../common/enum/Binary'; import { sortBy } from 'lodash'; @@ -36,8 +35,6 @@ export class BinarySyncerService extends AbstractService { @Inject() private readonly taskService: TaskService; @Inject() - private readonly taskRepository: TaskRepository; - @Inject() private readonly httpclient: EggHttpClient; @Inject() private readonly nfsAdapter: NFSAdapter; @@ -89,13 +86,7 @@ export class BinarySyncerService extends AbstractService { return await this.nfsAdapter.getDownloadUrlOrStream(binary.storePath); } - // SyncBinary 由定时任务每台单机定时触发,手动去重 - // 添加 bizId 在 db 防止重复,记录 id 错误 public async createTask(binaryName: BinaryName, lastData?: any) { - const existsTask = await this.taskRepository.findTaskByTargetName(binaryName, TaskType.SyncBinary); - if (existsTask) { - return existsTask; - } try { return await this.taskService.createTask(Task.createSyncBinary(binaryName, lastData), false); } catch (e) { diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index dd626b0c..e7b81eb9 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -29,7 +29,7 @@ export class TaskService extends AbstractService { const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type); // 只在包同步场景下做任务合并,其余场景通过 bizId 来进行任务幂等 - if (existsTask && [ TaskType.SyncPackage, TaskType.SyncBinary ].includes(task.type)) { + if (existsTask && Task.needMergeWhenWaiting(task.type)) { // 在包同步场景,如果任务还未被触发,就不继续重复创建 // 如果任务正在执行,可能任务状态已更新,这种情况需要继续创建 if (existsTask.state === TaskState.Waiting) {