Skip to content

Commit

Permalink
feat: add proxy service.
Browse files Browse the repository at this point in the history
  • Loading branch information
hezhengxu2018 committed Jun 22, 2023
1 parent ff8a81c commit 9a71d3d
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 21 deletions.
80 changes: 59 additions & 21 deletions app/common/adapter/NPMRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,51 @@ export class NPMRegistry {
this.registryHost = registryHost;
}

public async getFullManifests(fullname: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}): Promise<RegistryResponse> {
let retries = optionalConfig?.retries || 3;
public async getFullManifests(fullname: string, retries = 3): Promise<RegistryResponse> {
// set query t=timestamp, make sure CDN cache disable
// cache=0 is sync worker request flag
const url = `${this.registry}/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
let lastError: any;
while (retries > 0) {
try {
// large package: https://r.cnpmjs.org/%40procore%2Fcore-icons
// https://r.cnpmjs.org/intraactive-sdk-ui 44s
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
return await this.request('GET', url, undefined, { timeout: 120000, headers: { authorization } });
} catch (err: any) {
if (err.name === 'ResponseTimeoutError') throw err;
lastError = err;
}
retries--;
if (retries > 0) {
// sleep 1s ~ 4s in random
const delay = process.env.NODE_ENV === 'test' ? 1 : 1000 + Math.random() * 4000;
await setTimeout(delay);
}
}
throw lastError;
return await this.getManifest(url, {}, retries);
}

public async getAbbreviatedManifests(fullname: string, retries = 3): Promise<RegistryResponse> {
const url = `${this.registry}/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
const headers = { Accept: 'application/vnd.npm.install-v1+json' };
return await this.getManifest(url, headers, retries);
}

public async getPackageVersionManifest(fullname: string, versionOrTag: string, retries = 3) {
const url = `${this.registry}/${encodeURIComponent(fullname)}/${versionOrTag}`;
return await this.getManifest(url, {}, retries);
}


// public async getFullManifests(fullname: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}): Promise<RegistryResponse> {
// let retries = optionalConfig?.retries || 3;
// // set query t=timestamp, make sure CDN cache disable
// // cache=0 is sync worker request flag
// const url = `${this.registry}/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
// let lastError: any;
// while (retries > 0) {
// try {
// // large package: https://r.cnpmjs.org/%40procore%2Fcore-icons
// // https://r.cnpmjs.org/intraactive-sdk-ui 44s
// const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
// return await this.request('GET', url, undefined, { timeout: 120000, headers: { authorization } });
// } catch (err: any) {
// if (err.name === 'ResponseTimeoutError') throw err;
// lastError = err;
// }
// retries--;
// if (retries > 0) {
// // sleep 1s ~ 4s in random
// const delay = process.env.NODE_ENV === 'test' ? 1 : 1000 + Math.random() * 4000;
// await setTimeout(delay);
// }
// }
// throw lastError;
// }

// app.put('/:name/sync', sync.sync);
public async createSyncTask(fullname: string, optionalConfig?: { remoteAuthToken?:string}): Promise<RegistryResponse> {
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
Expand Down Expand Up @@ -112,4 +131,23 @@ export class NPMRegistry {
private genAuthorizationHeader(remoteAuthToken?:string) {
return remoteAuthToken ? `Bearer ${remoteAuthToken}` : '';
}

private async getManifest(url: string, headers = {}, retries = 3) {
let lastError: any;
while (retries > 0) {
try {
return await this.request('GET', url, undefined, { timeout: 120000, headers });
} catch (err: any) {
if (err.name === 'ResponseTimeoutError') throw err;
lastError = err;
}
retries--;
if (retries > 0) {
// sleep 1s ~ 4s in random
const delay = process.env.NODE_ENV === 'test' ? 1 : 1000 + Math.random() * 4000;
await setTimeout(delay);
}
}
throw lastError;
}
}
2 changes: 2 additions & 0 deletions app/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
export const BUG_VERSIONS = 'bug-versions';
export const LATEST_TAG = 'latest';
export const GLOBAL_WORKER = 'GLOBAL_WORKER';
export const PROXY_MODE_CACHED_PACKAGE_DIR_NAME = 'proxy-mode-cached-packages';
export enum SyncMode {
none = 'none',
admin = 'admin',
proxy = 'proxy',
exist = 'exist',
all = 'all',
}
Expand Down
142 changes: 142 additions & 0 deletions app/core/service/ProxyModeService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { InternalServerError, ForbiddenError, HttpError } from 'egg-errors';
import { SingletonProto, AccessLevel, Inject } from '@eggjs/tegg';
import { EggHttpClient } from 'egg';
import { calculateIntegrity } from '../../common/PackageUtil';
import { downloadToTempfile } from '../../common/FileUtil';
import { NPMRegistry, RegistryResponse } from '../../common/adapter/NPMRegistry';
import { AbstractService } from '../../common/AbstractService';
import { readFile, rm } from 'node:fs/promises';
import { NFSAdapter } from '../../common/adapter/NFSAdapter';
import { PROXY_MODE_CACHED_PACKAGE_DIR_NAME } from '../../common/constants';
import { DIST_NAMES } from '../entity/Package';

@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
})
export class ProxyModeService extends AbstractService {
@Inject()
private readonly httpclient: EggHttpClient;
@Inject()
private readonly npmRegistry: NPMRegistry;
@Inject()
private readonly nfsAdapter: NFSAdapter;

async getPackageVersionTarAndTempFilePath(fullname: string, url: string): Promise<{ tgzBuffer:Buffer| null }> {
if (this.config.cnpmcore.syncPackageBlockList.includes(fullname)) {
throw new ForbiddenError(`stop proxy by block list: ${JSON.stringify(this.config.cnpmcore.syncPackageBlockList)}`);
}
const requestTgzURL = `${this.npmRegistry.registry}/${url}`;
const { tmpfile } = await downloadToTempfile(this.httpclient, this.config.dataDir, requestTgzURL);
const tgzBuffer = await readFile(tmpfile);
await rm(tmpfile, { force: true });
return { tgzBuffer };
}

// used by GET /:fullname
async getPackageFullManifests(fullname: string) {
return await this._getPackageFullOrAbbreviatedManifest(fullname, true);
}

// used by GET /:fullname | GET /:fullname/:versionOrTag | GET /-/package/:fullname/dist-tags
async getPackageAbbreviatedManifests(fullname: string) {
return await this._getPackageFullOrAbbreviatedManifest(fullname, false);
}

// used by GET /:fullname/:versionOrTag
async getPackageVersionOrTagManifest(fullname: string, versionOrTag: string) {
const { data: manifest } = await this.getPackageAbbreviatedManifests(fullname);
const distTags = manifest['dist-tags'] || {};
const version = distTags[versionOrTag] ? distTags[versionOrTag] : versionOrTag;
const storeKey = `/${PROXY_MODE_CACHED_PACKAGE_DIR_NAME}/${fullname}/${version}/${DIST_NAMES.MANIFEST}`;
const nfsBytes = await this.nfsAdapter.getBytes(storeKey);
if (nfsBytes) {
let nfsPkgVersionManifgest = {};
try {
nfsPkgVersionManifgest = JSON.parse(Buffer.from(nfsBytes).toString('utf8'));
} catch {
// JSON parse error
await this.nfsAdapter.remove(storeKey);
throw new InternalServerError('manifest in NFS JSON parse error');
}
return nfsPkgVersionManifgest;
}

// not in NFS
const responseResult = await this.npmRegistry.getPackageVersionManifest(fullname, version);
if (responseResult.status !== 200) {
throw new HttpError({
status: responseResult.status,
message: responseResult.data || responseResult.statusText,
});
}

// get version manifest success
const pkgVerisonManifest = responseResult.data;
const { sourceRegistry, registry } = this.config.cnpmcore;
const pkgVerisonManifestDist = pkgVerisonManifest.dist;
if (pkgVerisonManifestDist && pkgVerisonManifestDist.tarball) {
pkgVerisonManifestDist.tarball = pkgVerisonManifestDist.tarball.replace(sourceRegistry, registry);
}
const proxyBytes = Buffer.from(JSON.stringify(pkgVerisonManifest));
await this.nfsAdapter.uploadBytes(storeKey, proxyBytes);
return pkgVerisonManifest;
}

private async _getPackageFullOrAbbreviatedManifest(fullname: string, isFullManifests: boolean) {
// check package is blocked
if (this.config.cnpmcore.syncPackageBlockList.includes(fullname)) {
const error = `stop cache by block list: ${JSON.stringify(this.config.cnpmcore.syncPackageBlockList)}`;
this.logger.info('[ProxyPackageAndPublishService.cacheManifests:fail-block-list] targetName: %s, %s',
fullname, error);
throw new ForbiddenError('this package is in block list');
}

const storeKey = isFullManifests ?
`/${PROXY_MODE_CACHED_PACKAGE_DIR_NAME}/${fullname}/${DIST_NAMES.FULL_MANIFESTS}` : `/${PROXY_MODE_CACHED_PACKAGE_DIR_NAME}/${fullname}/${DIST_NAMES.ABBREVIATED_MANIFESTS}`;
const nfsBytes = await this.nfsAdapter.getBytes(storeKey);
if (nfsBytes) {
let nfsPkgManifgest = {};
try {
const decoder = new TextDecoder();
const nfsString = decoder.decode(nfsBytes);
nfsPkgManifgest = JSON.parse(nfsString);
} catch {
// JSON parse error
await this.nfsAdapter.remove(storeKey);
throw new InternalServerError('manifest in NFS JSON parse error');
}
const { shasum: etag } = await calculateIntegrity(nfsBytes);
return { data: nfsPkgManifgest, etag, blockReason: '' };
}

// not in NFS
let responseResult: RegistryResponse;
if (isFullManifests) {
responseResult = await this.npmRegistry.getFullManifests(fullname);
} else {
responseResult = await this.npmRegistry.getAbbreviatedManifests(fullname);
}
if (responseResult.status !== 200) {
throw new HttpError({
status: responseResult.status,
message: responseResult.data?.error || responseResult.statusText,
});
}

// get manifest success
const pkgManifest = responseResult.data;
const { sourceRegistry, registry } = this.config.cnpmcore;
const versionMap = pkgManifest.versions || {};
for (const key in versionMap) {
const versionItem = versionMap[key];
if (versionItem.dist && versionItem.dist.tarball && typeof versionItem.dist.tarball === 'string') {
versionItem.dist.tarball = versionItem.dist.tarball.replace(sourceRegistry, registry);
}
}
const proxyBytes = Buffer.from(JSON.stringify(pkgManifest));
await this.nfsAdapter.uploadBytes(storeKey, proxyBytes);
const { shasum: etag } = await calculateIntegrity(proxyBytes);
return { data: pkgManifest, etag, blockReason: '' };
}

}

0 comments on commit 9a71d3d

Please sign in to comment.