From 64cbbd9c7daa8002d45fa5cc83cbe111f4e9385c Mon Sep 17 00:00:00 2001 From: rjawesome Date: Mon, 12 Aug 2024 14:24:53 -0700 Subject: [PATCH] fix 400 error tests --- src/controllers/threading/threadHandler.ts | 62 +++++++++++++--------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/src/controllers/threading/threadHandler.ts b/src/controllers/threading/threadHandler.ts index c725c98..3f50448 100644 --- a/src/controllers/threading/threadHandler.ts +++ b/src/controllers/threading/threadHandler.ts @@ -241,33 +241,42 @@ export async function runTask(req: Request, res: Response, route: string, useBul if (process.env.USE_THREADING === "false") { // Set up "inter thread messaging" const { port1: workerSide, port2: parentSide } = new MessageChannel(); - parentSide.on("message", async (msg: ThreadMessage) => { - switch (msg.type) { - case "subqueryRequest": - const { queries, options } = msg.value as { - queries: FrozenSubquery[]; - options: QueryHandlerOptions; - }; - debug(`Main thread receives ${queries.length} subqueries from worker.`); - subqueryRelay.subscribe( - await Promise.all(queries.map(async query => await Subquery.unfreeze(query))), - options, - ({ hash, records, logs, apiUnavailable }) => { - parentSide.postMessage({ - threadId: 0, - type: "subQueryResult", - value: { hash, records, logs, apiUnavailable }, - } satisfies ThreadMessage); - }, - ); - break; - } - }); global.workerSide = workerSide; - // Threading disabled, just use the provided function in main event loop - const response = (await tasks[route](taskInfo)) as TrapiResponse; - parentSide.close(); - return response; + + // start task + tasks[route](taskInfo); + return new Promise((resolve, reject) => { + parentSide.on("message", async (msg: ThreadMessage) => { + switch (msg.type) { + case "subqueryRequest": + const { queries, options } = msg.value as { + queries: FrozenSubquery[]; + options: QueryHandlerOptions; + }; + debug(`Main thread receives ${queries.length} subqueries from worker.`); + subqueryRelay.subscribe( + await Promise.all(queries.map(async query => await Subquery.unfreeze(query))), + options, + ({ hash, records, logs, apiUnavailable }) => { + parentSide.postMessage({ + threadId: 0, + type: "subQueryResult", + value: { hash, records, logs, apiUnavailable }, + } satisfies ThreadMessage); + }, + ); + break; + case "result": + resolve(msg.value as TrapiResponse); + parentSide.close(); + break; + case "error": + reject(msg.value as Error); + parentSide.close(); + break; + } + }); + }); } else if (!(queryQueue && useBullSync)) { // Redis unavailable or query not to sync queue such as asyncquery_status const response = await queueTaskToWorkers( @@ -361,6 +370,7 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) { export function taskResponse(response: T, status: number = undefined): T { if (global.workerSide) { global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage); + return undefined; } return response; }