Skip to content

Commit 4b8db29

Browse files
authored
perf: init token worker (#4726)
* perf: init token worker * init worker * preload worker * preload worker * remove invalid code
1 parent 5e3ec4d commit 4b8db29

File tree

3 files changed

+44
-15
lines changed

3 files changed

+44
-15
lines changed

packages/service/worker/preload.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { getWorkerController, WorkerNameEnum } from './utils';
2+
3+
export const preLoadWorker = async () => {
4+
const max = Number(global.systemEnv?.tokenWorkers || 30);
5+
const workerController = getWorkerController({
6+
name: WorkerNameEnum.countGptMessagesTokens,
7+
maxReservedThreads: max
8+
});
9+
10+
for await (const item of new Array(max).fill(0)) {
11+
const worker = workerController.createWorker();
12+
await workerController.run({
13+
workerId: worker.id,
14+
messages: [
15+
{
16+
role: 'user',
17+
content: '1'
18+
}
19+
]
20+
});
21+
console.log(`Preload worker ${workerController.workerQueue.length}`);
22+
}
23+
console.log('Preload worker success');
24+
};

packages/service/worker/utils.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,15 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
8787
this.maxReservedThreads = maxReservedThreads;
8888
}
8989

90-
runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
90+
private runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
9191
// Get idle worker or create a new worker
9292
const runningWorker = (() => {
93+
// @ts-ignore
94+
if (data.workerId) {
95+
// @ts-ignore
96+
const worker = this.workerQueue.find((item) => item.id === data.workerId);
97+
if (worker) return worker;
98+
}
9399
const worker = this.workerQueue.find((item) => item.status === 'idle');
94100
if (worker) return worker;
95101

@@ -157,23 +163,15 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
157163

158164
// watch response
159165
worker.on('message', ({ id, type, data }: WorkerResponse<Response>) => {
160-
// Run callback
161-
const workerItem = this.workerQueue.find((item) => item.id === id);
162-
163-
if (!workerItem) {
164-
addLog.warn('Invalid worker', { id, type, data });
165-
return;
166-
}
167-
168166
if (type === 'success') {
169-
workerItem.resolve(data);
167+
item.resolve(data);
170168
} else if (type === 'error') {
171-
workerItem.reject(data);
169+
item.reject(data);
172170
}
173171

174172
// Clear timeout timer and update worker status
175-
clearTimeout(workerItem.timeoutId);
176-
workerItem.status = 'idle';
173+
clearTimeout(item.timeoutId);
174+
item.status = 'idle';
177175
});
178176

179177
// Worker error, terminate and delete it.(Un catch error)
@@ -191,7 +189,7 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
191189
return item;
192190
}
193191

194-
deleteWorker(workerId: string) {
192+
private deleteWorker(workerId: string) {
195193
const item = this.workerQueue.find((item) => item.id === workerId);
196194
if (item) {
197195
item.reject?.('error');

projects/app/src/service/common/system/index.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import {
2424
getProApiDatasetFilePreviewUrlRequest
2525
} from '@/service/core/dataset/apiDataset/controller';
2626
import { isProVersion } from './constants';
27+
import { countPromptTokens } from '@fastgpt/service/common/string/tiktoken';
28+
import { preLoadWorker } from '../../../../../../packages/service/worker/preload';
2729

2830
export const readConfigData = async (name: string) => {
2931
const splitName = name.split('.');
@@ -89,7 +91,12 @@ export function initGlobalVariables() {
8991

9092
/* Init system data(Need to connected db). It only needs to run once */
9193
export async function getInitConfig() {
92-
return Promise.all([initSystemConfig(), getSystemVersion(), loadSystemModels()]);
94+
await Promise.all([initSystemConfig(), getSystemVersion(), loadSystemModels()]);
95+
try {
96+
await preLoadWorker();
97+
} catch (error) {
98+
console.error('Preload worker error', error);
99+
}
93100
}
94101

95102
const defaultFeConfigs: FastGPTFeConfigsType = {

0 commit comments

Comments
 (0)