diff --git a/src/controllers/threading/taskHandler.js b/src/controllers/threading/taskHandler.js index 7b78cfd..4c54a6e 100644 --- a/src/controllers/threading/taskHandler.js +++ b/src/controllers/threading/taskHandler.js @@ -1,9 +1,10 @@ const { isMainThread, threadId } = require("worker_threads"); +const workerData = require("piscina").workerData; const debug = require("debug")(`bte:biothings-explorer-trapi:worker${threadId}`); if (!isMainThread) { // Log thread start before BioLink model loads - debug(`Worker thread ${threadId} is ready to accept tasks.`); + debug(`Worker thread ${threadId} is ready to accept ${workerData.queue} tasks.`); } const { tasks } = require("../../routes/index"); @@ -34,7 +35,7 @@ Sentry.init({ }); const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) => { - debug(`Worker thread ${threadId} beginning task.`); + debug(`Worker thread ${threadId} beginning ${workerData.queue} task.`); global.SCHEMA_VERSION = "1.4.0"; @@ -63,7 +64,7 @@ const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) => transaction.finish(); - debug(`Worker thread ${threadId} completed task.`); + debug(`Worker thread ${threadId} completed ${workerData.queue} task.`); return completedTask; }; diff --git a/src/controllers/threading/threadHandler.js b/src/controllers/threading/threadHandler.js index 09834db..88ecb72 100644 --- a/src/controllers/threading/threadHandler.js +++ b/src/controllers/threading/threadHandler.js @@ -17,28 +17,36 @@ const ErrorHandler = require("../../middlewares/error.js"); const SYNC_MIN_CONCURRENCY = 2; const ASYNC_MIN_CONCURRENCY = 3; -// On Prod: 0.25 ratio * 16 cores * 4 instances = 16 threads -const SYNC_CONCURRENCY_RATIO = 0.25; -/** On Prod: 0.25 ratio * 16 cores * 4 instances = 16 threads - * Async has 3 queues: - * - general - * - by api - * - by team - * - * Distribution between those is seen below. - * */ -const ASYNC_CONCURRENCY_RATIO = 0.25; +// On most instances, there are two nodes, one for Service Provider endpoints and one for everything else +// On Dev and local instances, this isn't the case, so a lower concurrency is needed +const CORE_CONCURRENCY_RATIO = parseInt( + process.env.CORE_CONCURRENCY_RATIO ?? ((process.env.INSTANCE_ENV ?? "dev") === "dev" ? 1 : 2), +); +const MEM_CONCURRENCY_RATIO = parseFloat(process.env.MEM_CONCURRENCY_RATIO ?? 0.5); -let SYNC_CONCURRENCY = Math.ceil(os.cpus().length * SYNC_CONCURRENCY_RATIO); +const CORE_LIMIT = Math.ceil(os.cpus().length * CORE_CONCURRENCY_RATIO); + +const MEM_LIMIT = Math.ceil((os.totalmem() / 2e9) * MEM_CONCURRENCY_RATIO); + +// Ex. Prod: 16 cores / 64GB mem = min(16 * 2, 32) = 32 allowed concurrently +// Divided by 4 because each instance uses 4 sub-instances for reliability +let SYNC_CONCURRENCY = Math.ceil(Math.min(CORE_LIMIT, MEM_LIMIT) / 4); if (SYNC_CONCURRENCY < SYNC_MIN_CONCURRENCY) SYNC_CONCURRENCY = SYNC_MIN_CONCURRENCY; -let ASYNC_CONCURRENCY = Math.ceil(os.cpus().length * ASYNC_CONCURRENCY_RATIO); + +let ASYNC_CONCURRENCY = SYNC_CONCURRENCY; if (ASYNC_CONCURRENCY < ASYNC_MIN_CONCURRENCY) ASYNC_CONCURRENCY = ASYNC_MIN_CONCURRENCY; -const ASYNC_MAIN_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.ceil(ASYNC_CONCURRENCY / 3) : ASYNC_CONCURRENCY - 6; -const ASYNC_BY_API_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 3; -const ASYNC_BY_TEAM_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 3; +// Async has 3 separate queues, concurrency is distributed between them as such: +const ASYNC_MAIN_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.ceil(ASYNC_CONCURRENCY / 3) : ASYNC_CONCURRENCY - 4; +const ASYNC_BY_API_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 2; +const ASYNC_BY_TEAM_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 2; if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "false")) { + // Give user a little report of resource availability + debug(`Computed core limit: ${CORE_LIMIT}`); + debug(`Computed mem limit: ${MEM_LIMIT}`); + debug(`Sync concurrency limit: ${SYNC_CONCURRENCY}`); + debug(`Async concurrency limit: ${ASYNC_CONCURRENCY}`); const env = { ...process.env, DEBUG_COLORS: true, @@ -56,6 +64,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa maxThreads: SYNC_CONCURRENCY, maxQueue: 600, idleTimeout: 10 * 60 * 1000, // 10 minutes + workerData: { queue: "sync" }, env, }), /**Low-volume, high-intensity requests @@ -68,6 +77,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa maxThreads: ASYNC_CONCURRENCY, minThreads: 1, idleTimeout: 60 * 60 * 1000, // 1 hour + workerData: { queue: "async" }, env, }), /**High-volume, low-intensity requests @@ -78,6 +88,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa idleTimeout: 60 * 60 * 1000, // 1 hour minThreads: 2, maxQueue: 600, + workerData: { queue: "misc" }, env, }), }; diff --git a/src/server.js b/src/server.js index c4699a2..219ddf1 100644 --- a/src/server.js +++ b/src/server.js @@ -7,7 +7,7 @@ async function main() { const PORT = Number.parseInt(process.env.PORT) || 3000; cron(); app.listen(PORT, () => { - // console.log(`App listening at http://localhost:${PORT}`); + debug(`Instance Env: ${process.env.INSTANCE_ENV ?? "local"}`); console.log(`⭐⭐⭐ BioThings Explorer is ready! ⭐ Try it now @ http://localhost:${PORT} ✨`); }); process.env.DEBUG_COLORS = "true";