Skip to content

Commit

Permalink
feat: support npm search command like npmio (#513)
Browse files Browse the repository at this point in the history
- [x] 找个合适的 eggjs es 插件,或者手撸个,看社区的几个版本都比较低
- [x] HTTP Server 新增 API
`/-/v1/search?text=react&size=20&from=0&quality=0.65&popularity=0.98&maintenance=0.5`,第一版不一定能
qpm 都支持,先支持现有的下载量数据,即 popularity 的参考数据
- [x] 监听相关的 metadata 变更的 event,同步写入、删除 增量的 ES 数据,ES 有较强抗压能力,这块直接做成同步就好
- [x] 考虑可能同步也会丢部分数据(stream 不稳定时),HTTP Server 再追加一个手动同步 ES 的接口,传包名同步触发写
ES 即可
- [x] 提供全量一次性的初始化同步 ES 脚本
- [x] setting/mapping 参考
https://github.com/npms-io/npms-analyzer/blob/master/config/elasticsearch/npms.json5

---------

Co-authored-by: fengmk2 <[email protected]>
Co-authored-by: elrrrrrrr <[email protected]>
  • Loading branch information
3 people authored Sep 1, 2023
1 parent 6e45ac5 commit 7f85848
Show file tree
Hide file tree
Showing 19 changed files with 1,797 additions and 5 deletions.
16 changes: 15 additions & 1 deletion app/common/PackageUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import * as ssri from 'ssri';
import tar from 'tar';
import { PackageJSONType } from '../repository/PackageRepository';
import { AuthorType, PackageJSONType } from '../repository/PackageRepository';


// /@cnpm%2ffoo
// /@cnpm%2Ffoo
Expand Down Expand Up @@ -104,6 +105,19 @@ export async function hasShrinkWrapInTgz(contentOrFile: Uint8Array | string): Pr
}
}

/** 写入 ES 时,格式化 author */
export function formatAuthor(author: string | AuthorType | undefined): AuthorType | undefined {
if (author === undefined) {
return author;
}

if (typeof author === 'string') {
return { name: author };
}

return author;
}

export async function extractPackageJSON(tarballBytes: Buffer): Promise<PackageJSONType> {
return new Promise((resolve, reject) => {
Readable.from(tarballBytes)
Expand Down
7 changes: 7 additions & 0 deletions app/common/typing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { CnpmcoreConfig } from '../port/config';
import { Readable } from 'stream';
import { IncomingHttpHeaders } from 'http';
import { EggContext } from '@eggjs/tegg';
import { estypes } from '@elastic/elasticsearch';

export interface UploadResult {
key: string;
Expand Down Expand Up @@ -50,6 +51,12 @@ export interface QueueAdapter {
length(key: string): Promise<number>;
}

export interface SearchAdapter {
search<T>(query: any): Promise<estypes.SearchHitsMetadata<T>>;
upsert<T>(id: string, document: T): Promise<string>;
delete(id: string): Promise<string>;
}

export interface AuthUrlResult {
loginUrl: string;
doneUrl: string;
Expand Down
94 changes: 94 additions & 0 deletions app/core/event/SyncESPackage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// TODO sync event
/* eslint-disable @typescript-eslint/no-unused-vars */
import { EggAppConfig } from 'egg';
import { Event, Inject } from '@eggjs/tegg';
import {
PACKAGE_UNPUBLISHED,
PACKAGE_VERSION_ADDED,
PACKAGE_VERSION_REMOVED,
PACKAGE_TAG_ADDED,
PACKAGE_TAG_CHANGED,
PACKAGE_TAG_REMOVED,
PACKAGE_MAINTAINER_CHANGED,
PACKAGE_MAINTAINER_REMOVED,
PACKAGE_META_CHANGED,
} from './index';

import { PackageSearchService } from '../service/PackageSearchService';

class SyncESPackage {
@Inject()
protected readonly packageSearchService: PackageSearchService;

@Inject()
protected readonly config: EggAppConfig;

protected async syncPackage(fullname: string) {
if (!this.config.cnpmcore.enableElasticsearch) return;
await this.packageSearchService.syncPackage(fullname, true);
}
}

@Event(PACKAGE_UNPUBLISHED)
export class PackageUnpublished extends SyncESPackage {
async handle(fullname: string) {
if (!this.config.cnpmcore.enableElasticsearch) return;
await this.packageSearchService.removePackage(fullname);
}
}

@Event(PACKAGE_VERSION_ADDED)
export class PackageVersionAdded extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_VERSION_REMOVED)
export class PackageVersionRemoved extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_TAG_ADDED)
export class PackageTagAdded extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_TAG_CHANGED)
export class PackageTagChanged extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_TAG_REMOVED)
export class PackageTagRemoved extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_MAINTAINER_CHANGED)
export class PackageMaintainerChanged extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_MAINTAINER_REMOVED)
export class PackageMaintainerRemoved extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_META_CHANGED)
export class PackageMetaChanged extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}
216 changes: 216 additions & 0 deletions app/core/service/PackageSearchService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import { AccessLevel, Inject, SingletonProto } from '@eggjs/tegg';
import type { estypes } from '@elastic/elasticsearch';
import dayjs from 'dayjs';

import { AbstractService } from '../../common/AbstractService';
import { formatAuthor, getScopeAndName } from '../../common/PackageUtil';
import { PackageManagerService } from './PackageManagerService';
import { SearchManifestType, SearchMappingType, SearchRepository } from '../../repository/SearchRepository';
import { PackageVersionDownloadRepository } from '../../repository/PackageVersionDownloadRepository';
import { PackageRepository } from '../../repository/PackageRepository';


@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
})
export class PackageSearchService extends AbstractService {
@Inject()
private readonly packageManagerService: PackageManagerService;
@Inject()
private readonly searchRepository: SearchRepository;
@Inject()
private packageVersionDownloadRepository: PackageVersionDownloadRepository;
@Inject()
protected packageRepository: PackageRepository;

async syncPackage(fullname: string, isSync = true) {
const [ scope, name ] = getScopeAndName(fullname);
const fullManifests = await this.packageManagerService.listPackageFullManifests(scope, name, isSync);

if (!fullManifests.data) {
this.logger.warn('[PackageSearchService.syncPackage] save package:%s not found', fullname);
return;
}

const pkg = await this.packageRepository.findPackage(scope, name);
if (!pkg) {
this.logger.warn('[PackageSearchService.syncPackage] findPackage:%s not found', fullname);
return;
}

// get last year download data
const startDate = dayjs().subtract(1, 'year');
const endDate = dayjs();

const entities = await this.packageVersionDownloadRepository.query(pkg.packageId, startDate.toDate(), endDate.toDate());
let downloadsAll = 0;
for (const entity of entities) {
for (let i = 1; i <= 31; i++) {
const day = String(i).padStart(2, '0');
const field = `d${day}`;
const counter = entity[field];
if (!counter) continue;
downloadsAll += counter;
}
}

const { data: manifest } = fullManifests;

const latestVersion = manifest['dist-tags'].latest;
const latestManifest = manifest.versions[latestVersion];

const packageDoc: SearchMappingType = {
name: manifest.name,
version: latestVersion,
_rev: manifest._rev,
scope: scope ? scope.replace('@', '') : 'unscoped',
keywords: manifest.keywords || [],
versions: Object.keys(manifest.versions),
description: manifest.description,
license: manifest.license,
maintainers: manifest.maintainers,
author: formatAuthor(manifest.author),
'dist-tags': manifest['dist-tags'],
date: manifest.time[latestVersion],
created: manifest.time.created,
modified: manifest.time.modified,
// 归属 registry,keywords 枚举值
_source_registry_name: manifest._source_registry_name,
// 最新版本发布人 _npmUser:
_npmUser: latestManifest?._npmUser,
// 最新版本发布信息
publish_time: latestManifest?.publish_time,
};

const document: SearchManifestType = {
package: packageDoc,
downloads: {
all: downloadsAll,
},
};

return await this.searchRepository.upsertPackage(document);
}

async searchPackage(text: string, from: number, size: number): Promise<{ objects: (SearchManifestType | undefined)[], total: number }> {
const matchQueries = this._buildMatchQueries(text);
const scriptScore = this._buildScriptScore({
text,
scoreEffect: 0.25,
});

const res = await this.searchRepository.searchPackage({
body: {
size,
from,
query: {
function_score: {
boost_mode: 'replace',
query: {
bool: {
should: matchQueries,
minimum_should_match: matchQueries.length ? 1 : 0,
},
},
script_score: scriptScore,
},
},
},
});
const { hits, total } = res;
return {
objects: hits?.map(item => {
return item._source;
}),
total: (total as estypes.SearchTotalHits).value,
};
}

async removePackage(fullname: string) {
return await this.searchRepository.removePackage(fullname);
}

// https://github.com/npms-io/queries/blob/master/lib/search.js#L8C1-L78C2
private _buildMatchQueries(text: string) {
return [
// Standard match using cross_fields
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.standard^4',
'package.description.standard',
'package.keywords.standard^2',
],
type: 'cross_fields',
boost: 6,
tie_breaker: 0.5,
},
},

// Partial match using edge-ngram
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.edge_ngram^4',
'package.description.edge_ngram',
'package.keywords.edge_ngram^2',
],
type: 'phrase',
slop: 3,
boost: 3,
tie_breaker: 0.5,
},
},

// Normal term match with an english stemmer
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.english_docs^4',
'package.description.english_docs',
'package.keywords.english_docs^2',
],
type: 'cross_fields',
boost: 3,
tie_breaker: 0.5,
},
},

// Normal term match with a more aggressive english stemmer (not so important)
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.english_aggressive_docs^4',
'package.description.english_aggressive_docs',
'package.keywords.english_aggressive_docs^2',
],
type: 'cross_fields',
tie_breaker: 0.5,
},
},
];
}

private _buildScriptScore(params: { text: string | undefined, scoreEffect: number }) {
// keep search simple, only download(popularity)
const downloads = 'doc["downloads.all"].value';
const source = `doc["package.name.raw"].value.equals("${params.text}") ? 100000 + ${downloads} : _score * Math.pow(${downloads}, ${params.scoreEffect})`;
return {
script: {
source,
params: {
text: params.text || '',
scoreEffect: params.scoreEffect,
},
},
};
}
}
1 change: 1 addition & 0 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class PackageSyncerService extends AbstractService {
if (!this.allowSyncDownloadData) {
return;
}

const fullname = pkg.fullname;
const start = '2011-01-01';
const end = this.config.cnpmcore.syncDownloadDataMaxDate;
Expand Down
Loading

0 comments on commit 7f85848

Please sign in to comment.