Skip to content

Commit

Permalink
feat: sync specified versions (#487)
Browse files Browse the repository at this point in the history
允许同步指定版本
---------
Allow to sync the specified versions
  • Loading branch information
hezhengxu2018 committed Jun 11, 2023
1 parent 1dbf481 commit a9bb81a
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 7 deletions.
3 changes: 3 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export type SyncPackageTaskOptions = {
// force sync history version
forceSyncHistory?: boolean;
registryId?: string;
specificVersions?: Array<string>;
};

export interface CreateHookTaskData extends TaskBaseData {
Expand All @@ -56,6 +57,7 @@ export interface CreateSyncPackageTaskData extends TaskBaseData {
skipDependencies?: boolean;
syncDownloadData?: boolean;
forceSyncHistory?: boolean;
specificVersions?: Array<string>;
}

export interface ChangesStreamTaskData extends TaskBaseData {
Expand Down Expand Up @@ -137,6 +139,7 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
skipDependencies: options?.skipDependencies,
syncDownloadData: options?.syncDownloadData,
forceSyncHistory: options?.forceSyncHistory,
specificVersions: options?.specificVersions,
},
};
const task = this.create(data);
Expand Down
46 changes: 40 additions & 6 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import {
Inject,
} from '@eggjs/tegg';
import { Pointcut } from '@eggjs/tegg/aop';
import {
EggContextHttpClient,
} from 'egg';
import { EggHttpClient } from 'egg';
import { setTimeout } from 'timers/promises';
import { rm } from 'fs/promises';
import { isEqual } from 'lodash';
import semver from 'semver';
import semverRcompare from 'semver/functions/rcompare';
import semverPrerelease from 'semver/functions/prerelease';
import { NPMRegistry, RegistryResponse } from '../../common/adapter/NPMRegistry';
import { detectInstallScript, getScopeAndName } from '../../common/PackageUtil';
import { downloadToTempfile } from '../../common/FileUtil';
Expand Down Expand Up @@ -74,7 +74,7 @@ export class PackageSyncerService extends AbstractService {
@Inject()
private readonly cacheService: CacheService;
@Inject()
private readonly httpclient: EggContextHttpClient;
private readonly httpclient: EggHttpClient;
@Inject()
private readonly registryManagerService: RegistryManagerService;
@Inject()
Expand Down Expand Up @@ -350,7 +350,7 @@ export class PackageSyncerService extends AbstractService {
public async executeTask(task: Task) {
const fullname = task.targetName;
const [ scope, name ] = getScopeAndName(fullname);
const { tips, skipDependencies: originSkipDependencies, syncDownloadData, forceSyncHistory, remoteAuthToken } = task.data as SyncPackageTaskOptions;
const { tips, skipDependencies: originSkipDependencies, syncDownloadData, forceSyncHistory, remoteAuthToken, specificVersions } = task.data as SyncPackageTaskOptions;
let pkg = await this.packageRepository.findPackage(scope, name);
const registry = await this.initSpecRegistry(task, pkg, scope);
const registryHost = this.npmRegistry.registry;
Expand All @@ -367,6 +367,9 @@ export class PackageSyncerService extends AbstractService {
this.logger.info('[PackageSyncerService.executeTask:start] taskId: %s, targetName: %s, attempts: %s, taskQueue: %s/%s, syncUpstream: %s, log: %s',
task.taskId, task.targetName, task.attempts, taskQueueLength, taskQueueHighWaterSize, syncUpstream, logUrl);
logs.push(`[${isoNow()}] 🚧🚧🚧🚧🚧 Syncing from ${registryHost}/${fullname}, skipDependencies: ${skipDependencies}, syncUpstream: ${syncUpstream}, syncDownloadData: ${!!syncDownloadData}, forceSyncHistory: ${!!forceSyncHistory} attempts: ${task.attempts}, worker: "${os.hostname()}/${process.pid}", taskQueue: ${taskQueueLength}/${taskQueueHighWaterSize} 🚧🚧🚧🚧🚧`);
if (specificVersions) {
logs.push(`[${isoNow()}] 👉 syncing specific versions: ${specificVersions.join(' | ')} 👈`);
}
logs.push(`[${isoNow()}] 🚧 log: ${logUrl}`);

if (pkg && pkg?.registryId !== registry?.registryId) {
Expand Down Expand Up @@ -545,8 +548,20 @@ export class PackageSyncerService extends AbstractService {
const existsVersionCount = Object.keys(existsVersionMap).length;
const abbreviatedVersionMap = abbreviatedManifests?.versions ?? {};
// 2. save versions
const versions = Object.values<any>(versionMap);
if (specificVersions && !this.config.cnpmcore.strictSyncSpecivicVersion && !specificVersions.includes(distTags.latest)) {
logs.push(`[${isoNow()}] 📦 Add latest tag version "${fullname}: ${distTags.latest}"`);
specificVersions.push(distTags.latest);
}
const versions = specificVersions ? Object.values<any>(versionMap).filter(verItem => specificVersions.includes(verItem.version)) : Object.values<any>(versionMap);
logs.push(`[${isoNow()}] 🚧 Syncing versions ${existsVersionCount} => ${versions.length}`);
if (specificVersions) {
const availableVersionList = versions.map(item => item.version);
let notAvailableVersionList = specificVersions.filter(i => !availableVersionList.includes(i));
if (notAvailableVersionList.length > 0) {
notAvailableVersionList = Array.from(new Set(notAvailableVersionList));
logs.push(`[${isoNow()}] 🚧 Some specific versions are not available: 👉 ${notAvailableVersionList.join(' | ')} 👈`);
}
}
const updateVersions: string[] = [];
const differentMetas: any[] = [];
let syncIndex = 0;
Expand Down Expand Up @@ -788,6 +803,24 @@ export class PackageSyncerService extends AbstractService {
}
}
}
// 3.2 shoud add latest tag
// 在同步sepcific version时如果没有同步latestTag的版本会出现latestTag丢失或指向版本不正确的情况
if (specificVersions && this.config.cnpmcore.strictSyncSpecivicVersion) {
// 不允许自动同步latest版本,从已同步版本中选出latest
let latestStabelVersion;
const sortedVersionList = specificVersions.sort(semverRcompare);
latestStabelVersion = sortedVersionList.filter(i => !semverPrerelease(i))[0];
// 所有版本都不是稳定版本则指向非稳定版本保证latest存在
if (!latestStabelVersion) {
latestStabelVersion = sortedVersionList[0];
}
if (!existsDistTags.latest || semverRcompare(existsDistTags.latest, latestStabelVersion) === 1) {
logs.push(`[${isoNow()}] 🚧 patch latest tag from specific versions 🚧`);
changedTags.push({ action: 'change', tag: 'latest', version: latestStabelVersion });
await this.packageManagerService.savePackageTag(pkg, 'latest', latestStabelVersion);
}
}

if (changedTags.length > 0) {
logs.push(`[${isoNow()}] 🟢 Synced ${changedTags.length} tags: ${JSON.stringify(changedTags)}`);
}
Expand Down Expand Up @@ -836,6 +869,7 @@ export class PackageSyncerService extends AbstractService {
authorId: task.authorId,
authorIp: task.authorIp,
tips,
remoteAuthToken,
});
logs.push(`[${isoNow()}] 📦 Add dependency "${dependencyName}" sync task: ${dependencyTask.taskId}, db id: ${dependencyTask.id}`);
}
Expand Down
17 changes: 16 additions & 1 deletion app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { NFSAdapter } from '../../common/adapter/NFSAdapter';
import { TaskState, TaskType } from '../../common/enum/Task';
import { AbstractService } from '../../common/AbstractService';
import { TaskRepository } from '../../repository/TaskRepository';
import { Task } from '../entity/Task';
import { Task, CreateSyncPackageTaskData } from '../entity/Task';
import { QueueAdapter } from '../../common/typing';

@SingletonProto({
Expand All @@ -31,6 +31,21 @@ export class TaskService extends AbstractService {
// 如果任务还未被触发,就不继续重复创建
// 如果任务正在执行,可能任务状态已更新,这种情况需要继续创建
if (existsTask.state === TaskState.Waiting) {
if (task.type === TaskType.SyncPackage) {
// 如果是specificVersions的任务则可能可以和存量任务进行合并
const specificVersions = (task as Task<CreateSyncPackageTaskData>).data?.specificVersions;
const existsTaskSpecificVersions = (existsTask as Task<CreateSyncPackageTaskData>).data?.specificVersions;
if (existsTaskSpecificVersions) {
if (specificVersions) {
// 存量的任务和新增任务都是同步指定版本的任务,合并两者版本至存量任务
await this.taskRepository.updateSpecificVersionsOfWaitingTask(existsTask, specificVersions);
} else {
// 新增任务是全量同步任务,移除存量任务中的指定版本使其成为全量同步任务
await this.taskRepository.updateSpecificVersionsOfWaitingTask(existsTask);
}
}
// 存量任务是全量同步任务,直接提高任务优先级
}
// 提高任务的优先级
if (addTaskQueueOnExists) {
const queueLength = await this.getTaskQueueLength(task.type);
Expand Down
5 changes: 5 additions & 0 deletions app/port/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,9 @@ export type CnpmcoreConfig = {
* enable unpkg features, https://github.com/cnpm/cnpmcore/issues/452
*/
enableUnpkg: boolean,
/**
* enable this would make sync specific version task not append latest version into this task automatically,it would mark the local latest stable version as latest tag.
* in most cases, you should set to false to keep the same behavior as source registry.
*/
strictSyncSpecivicVersion: boolean,
};
2 changes: 2 additions & 0 deletions app/port/controller/PackageSyncController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export class PackageSyncController extends AbstractController {
force: !!data.force,
// only admin allow to sync history version
forceSyncHistory: !!data.forceSyncHistory && isAdmin,
specificVersions: data.specificVersions,
};
ctx.tValidate(SyncPackageTaskRule, params);
const [ scope, name ] = getScopeAndName(params.fullname);
Expand Down Expand Up @@ -102,6 +103,7 @@ export class PackageSyncController extends AbstractController {
syncDownloadData: params.syncDownloadData,
forceSyncHistory: params.forceSyncHistory,
registryId: registry?.registryId,
specificVersions: params.specificVersions && JSON.parse(params.specificVersions),
});
ctx.logger.info('[PackageSyncController.createSyncTask:success] taskId: %s, fullname: %s',
task.taskId, fullname);
Expand Down
21 changes: 21 additions & 0 deletions app/port/typebox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ export const Version = Type.String({
maxLength: 256,
});

export const VersionStringArray = Type.String({
format: 'semver-version-array',
transform: [ 'trim' ],
});

export const Spec = Type.String({
format: 'semver-spec',
minLength: 1,
Expand Down Expand Up @@ -85,6 +90,7 @@ export const SyncPackageTaskRule = Type.Object({
maxLength: 1024,
}),
skipDependencies: Type.Boolean(),
specificVersions: Type.Optional(VersionStringArray),
syncDownloadData: Type.Boolean(),
// force sync immediately, only allow by admin
force: Type.Boolean(),
Expand Down Expand Up @@ -149,6 +155,21 @@ export function patchAjv(ajv: any) {
return !!binaryConfig[binaryName];
},
});
ajv.addFormat('semver-version-array', {
type: 'string',
validate: (versionStringList: string) => {
let versionList;
try {
versionList = JSON.parse(versionStringList);
} catch (error) {
return false;
}
if (versionList instanceof Array) {
return versionList.every(version => !!semver.valid(version));
}
return false;
},
});
}

export const QueryPageOptions = Type.Object({
Expand Down
15 changes: 15 additions & 0 deletions app/repository/TaskRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ export class TaskRepository extends AbstractRepository {
await model.remove();
}

async updateSpecificVersionsOfWaitingTask(task: TaskEntity, specificVersions?: Array<string>): Promise<void> {
const model = await this.Task.findOne({ id: task.id });
if (!model || !model.data.specificVersions) return;
if (specificVersions) {
const data = model.data;
const combinedVersions = Array.from(new Set(data.specificVersions.concat(specificVersions)));
data.specificVersions = combinedVersions;
await model.update({ data });
} else {
const data = model.data;
Reflect.deleteProperty(data, 'specificVersions');
await model.update({ data });
}
}

async findTask(taskId: string) {
const task = await this.Task.findOne({ taskId });
if (task) {
Expand Down
1 change: 1 addition & 0 deletions config/config.default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export const cnpmcoreConfig: CnpmcoreConfig = {
syncNotFound: false,
redirectNotFound: true,
enableUnpkg: true,
strictSyncSpecivicVersion: false,
};

export default (appInfo: EggAppConfig) => {
Expand Down
21 changes: 21 additions & 0 deletions test/core/service/PackageSyncerService/createTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,27 @@ describe('test/core/service/PackageSyncerService/createTask.test.ts', () => {
assert(res[1].taskId === task.taskId);
});

it('should append specific version to waiting task.', async () => {
const name = '@cnpmcore/test-sync-package-has-two-versions';
await packageSyncerService.createTask(name, { specificVersions: [ '1.0.0' ] });
await packageSyncerService.createTask(name, { specificVersions: [ '2.0.0' ] });
const task = await packageSyncerService.findExecuteTask();
assert(task);
assert.equal(task.targetName, name);
assert(task.data.specificVersions);
assert(task.data.specificVersions.length === 2);
});

it('should remove specific version, switch waiting task to sync all versions.', async () => {
const name = '@cnpmcore/test-sync-package-has-two-versions';
await packageSyncerService.createTask(name, { specificVersions: [ '1.0.0' ] });
await packageSyncerService.createTask(name);
const task = await packageSyncerService.findExecuteTask();
assert(task);
assert.equal(task.targetName, name);
assert(task.data.specificVersions === undefined);
});

it('should not duplicate task when waiting', async () => {
const task = await packageSyncerService.createTask(pkgName);
const newTask = await packageSyncerService.createTask(pkgName);
Expand Down
Loading

0 comments on commit a9bb81a

Please sign in to comment.