Skip to content

Commit

Permalink
feat: memory-aware concurrency, better threading messages
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Nov 9, 2023
1 parent c9a2e4e commit 8c19c3a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
7 changes: 4 additions & 3 deletions src/controllers/threading/taskHandler.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const { isMainThread, threadId } = require("worker_threads");
const workerData = require("piscina").workerData;
const debug = require("debug")(`bte:biothings-explorer-trapi:worker${threadId}`);

if (!isMainThread) {
// Log thread start before BioLink model loads
debug(`Worker thread ${threadId} is ready to accept tasks.`);
debug(`Worker thread ${threadId} is ready to accept ${workerData.queue} tasks.`);
}

const { tasks } = require("../../routes/index");
Expand Down Expand Up @@ -34,7 +35,7 @@ Sentry.init({
});

const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) => {
debug(`Worker thread ${threadId} beginning task.`);
debug(`Worker thread ${threadId} beginning ${workerData.queue} task.`);

global.SCHEMA_VERSION = "1.4.0";

Expand Down Expand Up @@ -63,7 +64,7 @@ const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) =>

transaction.finish();

debug(`Worker thread ${threadId} completed task.`);
debug(`Worker thread ${threadId} completed ${workerData.queue} task.`);

return completedTask;
};
Expand Down
43 changes: 27 additions & 16 deletions src/controllers/threading/threadHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,36 @@ const ErrorHandler = require("../../middlewares/error.js");
const SYNC_MIN_CONCURRENCY = 2;
const ASYNC_MIN_CONCURRENCY = 3;

// On Prod: 0.25 ratio * 16 cores * 4 instances = 16 threads
const SYNC_CONCURRENCY_RATIO = 0.25;
/** On Prod: 0.25 ratio * 16 cores * 4 instances = 16 threads
* Async has 3 queues:
* - general
* - by api
* - by team
*
* Distribution between those is seen below.
* */
const ASYNC_CONCURRENCY_RATIO = 0.25;
// On most instances, there are two nodes, one for Service Provider endpoints and one for everything else
// On Dev and local instances, this isn't the case, so a lower concurrency is needed
const CORE_CONCURRENCY_RATIO = parseInt(
process.env.CORE_CONCURRENCY_RATIO ?? ((process.env.INSTANCE_ENV ?? "dev") === "dev" ? 1 : 2),
);
const MEM_CONCURRENCY_RATIO = parseFloat(process.env.MEM_CONCURRENCY_RATIO ?? 0.5);

let SYNC_CONCURRENCY = Math.ceil(os.cpus().length * SYNC_CONCURRENCY_RATIO);
const CORE_LIMIT = Math.ceil(os.cpus().length * CORE_CONCURRENCY_RATIO);

const MEM_LIMIT = Math.ceil((os.totalmem() / 2e9) * MEM_CONCURRENCY_RATIO);

// Ex. Prod: 16 cores / 64GB mem = min(16 * 2, 32) = 32 allowed concurrently
// Divided by 4 because each instance uses 4 sub-instances for reliability
let SYNC_CONCURRENCY = Math.ceil(Math.min(CORE_LIMIT, MEM_LIMIT) / 4);
if (SYNC_CONCURRENCY < SYNC_MIN_CONCURRENCY) SYNC_CONCURRENCY = SYNC_MIN_CONCURRENCY;
let ASYNC_CONCURRENCY = Math.ceil(os.cpus().length * ASYNC_CONCURRENCY_RATIO);

let ASYNC_CONCURRENCY = SYNC_CONCURRENCY;
if (ASYNC_CONCURRENCY < ASYNC_MIN_CONCURRENCY) ASYNC_CONCURRENCY = ASYNC_MIN_CONCURRENCY;

const ASYNC_MAIN_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.ceil(ASYNC_CONCURRENCY / 3) : ASYNC_CONCURRENCY - 6;
const ASYNC_BY_API_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 3;
const ASYNC_BY_TEAM_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 3;
// Async has 3 separate queues, concurrency is distributed between them as such:
const ASYNC_MAIN_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.ceil(ASYNC_CONCURRENCY / 3) : ASYNC_CONCURRENCY - 4;
const ASYNC_BY_API_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 2;
const ASYNC_BY_TEAM_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 2;

if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "false")) {
// Give user a little report of resource availability
debug(`Computed core limit: ${CORE_LIMIT}`);
debug(`Computed mem limit: ${MEM_LIMIT}`);
debug(`Sync concurrency limit: ${SYNC_CONCURRENCY}`);
debug(`Async concurrency limit: ${ASYNC_CONCURRENCY}`);
const env = {
...process.env,
DEBUG_COLORS: true,
Expand All @@ -56,6 +64,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa
maxThreads: SYNC_CONCURRENCY,
maxQueue: 600,
idleTimeout: 10 * 60 * 1000, // 10 minutes
workerData: { queue: "sync" },
env,
}),
/**Low-volume, high-intensity requests
Expand All @@ -68,6 +77,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa
maxThreads: ASYNC_CONCURRENCY,
minThreads: 1,
idleTimeout: 60 * 60 * 1000, // 1 hour
workerData: { queue: "async" },
env,
}),
/**High-volume, low-intensity requests
Expand All @@ -78,6 +88,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa
idleTimeout: 60 * 60 * 1000, // 1 hour
minThreads: 2,
maxQueue: 600,
workerData: { queue: "misc" },
env,
}),
};
Expand Down
2 changes: 1 addition & 1 deletion src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async function main() {
const PORT = Number.parseInt(process.env.PORT) || 3000;
cron();
app.listen(PORT, () => {
// console.log(`App listening at http://localhost:${PORT}`);
debug(`Instance Env: ${process.env.INSTANCE_ENV ?? "local"}`);
console.log(`⭐⭐⭐ BioThings Explorer is ready! ⭐ Try it now @ http://localhost:${PORT} ✨`);
});
process.env.DEBUG_COLORS = "true";
Expand Down

0 comments on commit 8c19c3a

Please sign in to comment.