Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: hooks triggers #641

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
return task;
}

public static needMergeWhenWaiting(type: TaskType) {
return [ TaskType.SyncBinary, TaskType.SyncPackage ].includes(type);
}

start(): TaskUpdateCondition {
const condition = {
taskId: this.taskId,
Expand Down
9 changes: 0 additions & 9 deletions app/core/service/BinarySyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import { TaskService } from './TaskService';
import { AbstractBinary, BinaryItem } from '../../common/adapter/binary/AbstractBinary';
import { AbstractService } from '../../common/AbstractService';
import { TaskRepository } from '../../repository/TaskRepository';
import { BinaryType } from '../../common/enum/Binary';
import { sortBy } from 'lodash';

Expand All @@ -36,8 +35,6 @@
@Inject()
private readonly taskService: TaskService;
@Inject()
private readonly taskRepository: TaskRepository;
@Inject()
private readonly httpclient: EggHttpClient;
@Inject()
private readonly nfsAdapter: NFSAdapter;
Expand Down Expand Up @@ -89,13 +86,7 @@
return await this.nfsAdapter.getDownloadUrlOrStream(binary.storePath);
}

// SyncBinary 由定时任务每台单机定时触发,手动去重
// 添加 bizId 在 db 防止重复,记录 id 错误
public async createTask(binaryName: BinaryName, lastData?: any) {
const existsTask = await this.taskRepository.findTaskByTargetName(binaryName, TaskType.SyncBinary);
if (existsTask) {
return existsTask;
}
try {
return await this.taskService.createTask(Task.createSyncBinary(binaryName, lastData), false);
} catch (e) {
Expand Down Expand Up @@ -149,9 +140,9 @@
if (err.name === 'HttpClientRequestTimeoutError'
|| err.name === 'ConnectionError'
|| err.name === 'ConnectTimeoutError') {
this.logger.warn('[BinarySyncerService.executeTask:fail] taskId: %s, targetName: %s, %s',
task.taskId, task.targetName, task.error);
this.logger.warn(err);

Check warning on line 145 in app/core/service/BinarySyncerService.ts

View check run for this annotation

Codecov / codecov/patch

app/core/service/BinarySyncerService.ts#L143-L145

Added lines #L143 - L145 were not covered by tests
} else {
this.logger.error('[BinarySyncerService.executeTask:fail] taskId: %s, targetName: %s, %s',
task.taskId, task.targetName, task.error);
Expand Down Expand Up @@ -221,8 +212,8 @@
if (err.name === 'DownloadStatusInvalidError') {
this.logger.warn('Download binary %s %s', item.sourceUrl, err);
} else {
this.logger.error('Download binary %s %s', item.sourceUrl, err);
}

Check warning on line 216 in app/core/service/BinarySyncerService.ts

View check run for this annotation

Codecov / codecov/patch

app/core/service/BinarySyncerService.ts#L215-L216

Added lines #L215 - L216 were not covered by tests
hasDownloadError = true;
logs.push(`[${isoNow()}][${dir}] ❌ [${parentIndex}${index}] Download ${item.sourceUrl} error: ${err}`);
}
Expand Down
6 changes: 4 additions & 2 deletions app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ export class TaskService extends AbstractService {

public async createTask(task: Task, addTaskQueueOnExists: boolean) {
const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type);
if (existsTask) {
// 如果任务还未被触发,就不继续重复创建

// 只在包同步场景下做任务合并,其余场景通过 bizId 来进行任务幂等
if (existsTask && Task.needMergeWhenWaiting(task.type)) {
// 在包同步场景,如果任务还未被触发,就不继续重复创建
// 如果任务正在执行,可能任务状态已更新,这种情况需要继续创建
if (existsTask.state === TaskState.Waiting) {
if (task.type === TaskType.SyncPackage) {
Expand Down
36 changes: 29 additions & 7 deletions test/core/service/HookTriggerService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { TestUtil } from '../../../test/TestUtil';
import { HookManageService } from '../../../app/core/service/HookManageService';
import { HookType } from '../../../app/common/enum/Hook';
import { UserRepository } from '../../../app/repository/UserRepository';
import { PACKAGE_VERSION_ADDED } from '../../../app/core/event';
import { PACKAGE_TAG_ADDED, PACKAGE_VERSION_ADDED } from '../../../app/core/event';
import { Change } from '../../../app/core/entity/Change';
import { ChangeRepository } from '../../../app/repository/ChangeRepository';
import { Task, TriggerHookTask } from '../../../app/core/entity/Task';
Expand Down Expand Up @@ -42,29 +42,46 @@ describe('test/core/service/HookTriggerService.test.ts', () => {
});

describe('executeTask', () => {
let change: Change;
let versionChange: Change;
let tagChange: Change;
let hook: Hook;
let callEndpoint: string;
let callOptions: HttpClientRequestOptions;

beforeEach(async () => {
change = Change.create({
versionChange = Change.create({
type: PACKAGE_TAG_ADDED,
targetName: pkgName,
data: {
tag: 'latest',
},
});
tagChange = Change.create({
type: PACKAGE_VERSION_ADDED,
targetName: pkgName,
data: {
version: '1.0.0',
},
});
await changeRepository.addChange(change);
await Promise.all([
changeRepository.addChange(versionChange),
changeRepository.addChange(tagChange),
]);

hook = await hookManageService.createHook({
type: HookType.Package,
ownerId: userId,
name: pkgName,
endpoint: 'http://foo.com',
secret: 'mock_secret',
});
const task = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, change.changeId, '1.0.0', 'latest'));
await createHookTriggerService.executeTask(task);
const versionTask = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, versionChange.changeId, '1.0.0', 'latest'));
const tagTask = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, tagChange.changeId, '1.0.0', 'latest'));

await Promise.all([
createHookTriggerService.executeTask(versionTask),
createHookTriggerService.executeTask(tagTask),
]);

mock(app.httpclient, 'request', async (url, options) => {
callEndpoint = url;
Expand All @@ -76,7 +93,7 @@ describe('test/core/service/HookTriggerService.test.ts', () => {
});

it('should execute trigger', async () => {
const pushTask = await taskRepository.findTaskByBizId(`TriggerHook:${change.changeId}:${hook.hookId}`) as TriggerHookTask;
const pushTask = await taskRepository.findTaskByBizId(`TriggerHook:${versionChange.changeId}:${hook.hookId}`) as TriggerHookTask;
await hookTriggerService.executeTask(pushTask);
assert(callEndpoint === hook.endpoint);
assert(callOptions);
Expand All @@ -97,5 +114,10 @@ describe('test/core/service/HookTriggerService.test.ts', () => {
});
assert(data.time === pushTask.data.hookEvent.time);
});

it('should create each event', async () => {
const tasks = await Promise.all([ taskRepository.findTaskByBizId(`TriggerHook:${versionChange.changeId}:${hook.hookId}`), taskRepository.findTaskByBizId(`TriggerHook:${tagChange.changeId}:${hook.hookId}`) ]);
assert.equal(tasks.filter(Boolean).length, 2);
});
});
});
Loading