Skip to content

Commit

Permalink
fix: hooks triggers (#641)
Browse files Browse the repository at this point in the history
> Since the eventBus#cork , version & tag events are triggered at same
time, cause the abnormal triggers of different types of hooks.
* ~~🐞 Fix triggerHook type targetName to be `tagetName:changeId`~~
* 🤖 Only merge sync tasks (binary, package) which in waiting states
--------
> 由于 `eventBus#cork` 机制,版本事件同时触发,导致不同类型 hook 触发异常
* ~~🐞 修改 triggerHook 类型 targetName 为 `包名:changeId`~~
* 🤖 仅合并 waiting 状态下包同步任务
  • Loading branch information
elrrrrrrr authored Jan 23, 2024
1 parent 08678c7 commit 838eecf
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 18 deletions.
4 changes: 4 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
return task;
}

public static needMergeWhenWaiting(type: TaskType) {
return [ TaskType.SyncBinary, TaskType.SyncPackage ].includes(type);
}

start(): TaskUpdateCondition {
const condition = {
taskId: this.taskId,
Expand Down
9 changes: 0 additions & 9 deletions app/core/service/BinarySyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { Binary } from '../entity/Binary';
import { TaskService } from './TaskService';
import { AbstractBinary, BinaryItem } from '../../common/adapter/binary/AbstractBinary';
import { AbstractService } from '../../common/AbstractService';
import { TaskRepository } from '../../repository/TaskRepository';
import { BinaryType } from '../../common/enum/Binary';
import { sortBy } from 'lodash';

Expand All @@ -36,8 +35,6 @@ export class BinarySyncerService extends AbstractService {
@Inject()
private readonly taskService: TaskService;
@Inject()
private readonly taskRepository: TaskRepository;
@Inject()
private readonly httpclient: EggHttpClient;
@Inject()
private readonly nfsAdapter: NFSAdapter;
Expand Down Expand Up @@ -89,13 +86,7 @@ export class BinarySyncerService extends AbstractService {
return await this.nfsAdapter.getDownloadUrlOrStream(binary.storePath);
}

// SyncBinary 由定时任务每台单机定时触发,手动去重
// 添加 bizId 在 db 防止重复,记录 id 错误
public async createTask(binaryName: BinaryName, lastData?: any) {
const existsTask = await this.taskRepository.findTaskByTargetName(binaryName, TaskType.SyncBinary);
if (existsTask) {
return existsTask;
}
try {
return await this.taskService.createTask(Task.createSyncBinary(binaryName, lastData), false);
} catch (e) {
Expand Down
6 changes: 4 additions & 2 deletions app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ export class TaskService extends AbstractService {

public async createTask(task: Task, addTaskQueueOnExists: boolean) {
const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type);
if (existsTask) {
// 如果任务还未被触发,就不继续重复创建

// 只在包同步场景下做任务合并,其余场景通过 bizId 来进行任务幂等
if (existsTask && Task.needMergeWhenWaiting(task.type)) {
// 在包同步场景,如果任务还未被触发,就不继续重复创建
// 如果任务正在执行,可能任务状态已更新,这种情况需要继续创建
if (existsTask.state === TaskState.Waiting) {
if (task.type === TaskType.SyncPackage) {
Expand Down
36 changes: 29 additions & 7 deletions test/core/service/HookTriggerService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { TestUtil } from '../../../test/TestUtil';
import { HookManageService } from '../../../app/core/service/HookManageService';
import { HookType } from '../../../app/common/enum/Hook';
import { UserRepository } from '../../../app/repository/UserRepository';
import { PACKAGE_VERSION_ADDED } from '../../../app/core/event';
import { PACKAGE_TAG_ADDED, PACKAGE_VERSION_ADDED } from '../../../app/core/event';
import { Change } from '../../../app/core/entity/Change';
import { ChangeRepository } from '../../../app/repository/ChangeRepository';
import { Task, TriggerHookTask } from '../../../app/core/entity/Task';
Expand Down Expand Up @@ -42,29 +42,46 @@ describe('test/core/service/HookTriggerService.test.ts', () => {
});

describe('executeTask', () => {
let change: Change;
let versionChange: Change;
let tagChange: Change;
let hook: Hook;
let callEndpoint: string;
let callOptions: HttpClientRequestOptions;

beforeEach(async () => {
change = Change.create({
versionChange = Change.create({
type: PACKAGE_TAG_ADDED,
targetName: pkgName,
data: {
tag: 'latest',
},
});
tagChange = Change.create({
type: PACKAGE_VERSION_ADDED,
targetName: pkgName,
data: {
version: '1.0.0',
},
});
await changeRepository.addChange(change);
await Promise.all([
changeRepository.addChange(versionChange),
changeRepository.addChange(tagChange),
]);

hook = await hookManageService.createHook({
type: HookType.Package,
ownerId: userId,
name: pkgName,
endpoint: 'http://foo.com',
secret: 'mock_secret',
});
const task = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, change.changeId, '1.0.0', 'latest'));
await createHookTriggerService.executeTask(task);
const versionTask = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, versionChange.changeId, '1.0.0', 'latest'));
const tagTask = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, tagChange.changeId, '1.0.0', 'latest'));

await Promise.all([
createHookTriggerService.executeTask(versionTask),
createHookTriggerService.executeTask(tagTask),
]);

mock(app.httpclient, 'request', async (url, options) => {
callEndpoint = url;
Expand All @@ -76,7 +93,7 @@ describe('test/core/service/HookTriggerService.test.ts', () => {
});

it('should execute trigger', async () => {
const pushTask = await taskRepository.findTaskByBizId(`TriggerHook:${change.changeId}:${hook.hookId}`) as TriggerHookTask;
const pushTask = await taskRepository.findTaskByBizId(`TriggerHook:${versionChange.changeId}:${hook.hookId}`) as TriggerHookTask;
await hookTriggerService.executeTask(pushTask);
assert(callEndpoint === hook.endpoint);
assert(callOptions);
Expand All @@ -97,5 +114,10 @@ describe('test/core/service/HookTriggerService.test.ts', () => {
});
assert(data.time === pushTask.data.hookEvent.time);
});

it('should create each event', async () => {
const tasks = await Promise.all([ taskRepository.findTaskByBizId(`TriggerHook:${versionChange.changeId}:${hook.hookId}`), taskRepository.findTaskByBizId(`TriggerHook:${tagChange.changeId}:${hook.hookId}`) ]);
assert.equal(tasks.filter(Boolean).length, 2);
});
});
});

0 comments on commit 838eecf

Please sign in to comment.