Skip to content

Commit

Permalink
Merge pull request #5 from biothings/concurrency
Browse files Browse the repository at this point in the history
Concurrency improvements
  • Loading branch information
tokebe authored Dec 7, 2023
2 parents baf2bc2 + ad1279f commit 082be5d
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 111 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"dependencies": {
"@biothings-explorer/query_graph_handler": "workspace:../query_graph_handler",
"@biothings-explorer/smartapi-kg": "workspace:../smartapi-kg",
"@biothings-explorer/utils": "workspace:../utils",
"@bull-board/api": "^5.9.1",
"@bull-board/express": "^5.9.1",
"@sentry/node": "^7.74.1",
Expand Down
60 changes: 34 additions & 26 deletions src/config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,48 +70,56 @@ module.exports = class Config {
setLimiter() {
const slowLimiter = rateLimit({
windowMs: 1 * 60 * 1000, //1min
max: process.env.MAX_QUERIES_PER_MIN || 15,
max: process.env.MAX_QUERIES_PER_MIN || 20,
});
const medLimiter = rateLimit({
windowMs: 1 * 60 * 1000, //1min
max: process.env.MAX_QUERIES_PER_MIN || 30,
});
const fastLimiter = rateLimit({
windowMs: 1 * 60 * 1000, //1min
max: process.env.MAX_QUERIES_PER_MIN || 60,
max: process.env.MAX_QUERIES_PER_MIN || 6000,
});
this.app.use("/", fastLimiter);
this.app.use("/v1/query", slowLimiter);
this.app.use("/v1/team/:team_name/query", slowLimiter);
this.app.use("/v1/team/:team_name/query", slowLimiter);
this.app.use("/v1/team/:team_name/query", medLimiter);
this.app.use("/v1/team/:smartapiID/query", medLimiter);
this.app.use("/v1/meta_knowledge_graph", medLimiter);
this.app.use("/v1/team/:teamName/meta_knowledge_graph", medLimiter);
this.app.use("/v1/smartapi/:smartapiID/meta_knowledge_graph", medLimiter);
this.app.use("/v1/asyncquery", fastLimiter);
this.app.use("/v1/team/:teamName/asyncquery", fastLimiter);
this.app.use("/v1/smartapi/:smartapiID/asyncquery", fastLimiter);
this.app.use("/queues", fastLimiter);
}

setSentry() {
// use SENTRY_DSN environment variable
Sentry.init({
// dsn: "https://5297933ef0f6487c9fd66532bb1fcefe@o4505444772806656.ingest.sentry.io/4505449737420800",
integrations: [
// enable HTTP calls tracing
new Sentry.Integrations.Http({ tracing: true }),
// enable Express.js middleware tracing
new Sentry.Integrations.Express({ app: this.app }),
// Automatically instrument Node.js libraries and frameworks
...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations(),
],
try {
Sentry.init({
integrations: [
// enable HTTP calls tracing
new Sentry.Integrations.Http({ tracing: true }),
// enable Express.js middleware tracing
new Sentry.Integrations.Express({ app: this.app }),
// Automatically instrument Node.js libraries and frameworks
...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations(),
],

// Set tracesSampleRate to 1.0 to capture 100%
// of transactions for performance monitoring.
// We recommend adjusting this value in production
tracesSampleRate: process.env.EXPRESS_SAMPLE_RATE ? parseFloat(process.env.EXPRESS_SAMPLE_RATE) : 1.0,
environment: process.env.INSTANCE_ENV,
});

// RequestHandler creates a separate execution context, so that all
// transactions/spans/breadcrumbs are isolated across requests
this.app.use(Sentry.Handlers.requestHandler({ user: false }));
// TracingHandler creates a trace for every incoming request
this.app.use(Sentry.Handlers.tracingHandler());
// Set tracesSampleRate to 1.0 to capture 100%
// of transactions for performance monitoring.
// We recommend adjusting this value in production
tracesSampleRate: process.env.EXPRESS_SAMPLE_RATE ? parseFloat(process.env.EXPRESS_SAMPLE_RATE) : 1.0,
environment: process.env.INSTANCE_ENV,
});
// RequestHandler creates a separate execution context, so that all
// transactions/spans/breadcrumbs are isolated across requests
this.app.use(Sentry.Handlers.requestHandler({ user: false }));
// TracingHandler creates a trace for every incoming request
this.app.use(Sentry.Handlers.tracingHandler());
} catch (error) {
debug("Sentry init error. This does not affect execution.");
debug(error);
}
}
};
6 changes: 3 additions & 3 deletions src/controllers/async/asyncquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ const axios = require("axios");
const { customAlphabet } = require("nanoid");
const utils = require("../../utils/common");
const { redisClient } = require("@biothings-explorer/query_graph_handler");
const { LogEntry } = require("@biothings-explorer/query_graph_handler");
const { LogEntry } = require("@biothings-explorer/utils");
const lz4 = require("lz4");
const { Readable } = require("stream");
const chunker = require("stream-chunker");
const { parser } = require("stream-json");
const Assembler = require("stream-json/Assembler");
const Sentry = require("@sentry/node");
const { Telemetry } = require("@biothings-explorer/utils");
const ErrorHandler = require("../../middlewares/error.js");

exports.asyncquery = async (req, res, next, queueData, queryQueue) => {
Expand Down Expand Up @@ -157,7 +157,7 @@ exports.asyncqueryResponse = async (handler, callback_url, jobID = null, jobURL
console.error(e);

if (ErrorHandler.shouldHandleError(e)) {
Sentry.captureException(e);
Telemetry.captureException(e);
}

//shape error > will be handled below
Expand Down
87 changes: 57 additions & 30 deletions src/controllers/threading/taskHandler.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const { isMainThread, threadId } = require("worker_threads");
const workerData = require("piscina").workerData;
const debug = require("debug")(`bte:biothings-explorer-trapi:worker${threadId}`);

if (!isMainThread) {
// Log thread start before BioLink model loads
debug(`Worker thread ${threadId} is ready to accept tasks.`);
debug(`Worker thread ${threadId} is ready to accept ${workerData.queue} tasks.`);
}

const { tasks } = require("../../routes/index");
Expand All @@ -12,29 +13,35 @@ const Sentry = require("@sentry/node");
const { ProfilingIntegration } = require("@sentry/profiling-node");

// use SENTRY_DSN environment variable
Sentry.init({
// dsn: "https://5297933ef0f6487c9fd66532bb1fcefe@o4505444772806656.ingest.sentry.io/4505449737420800",
integrations: [
// Automatically instrument Node.js libraries and frameworks
...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations(),
new ProfilingIntegration(),
],
environment: process.env.INSTANCE_ENV,
debug: true,
normalizeDepth: 6,
maxBreadcrumbs: 500,
// Set tracesSampleRate to 1.0 to capture 100%
// of transactions for performance monitoring.
// We recommend adjusting this value in production
tracesSampleRate: process.env.THREAD_SAMPLE_RATE ? parseFloat(process.env.THREAD_SAMPLE_RATE) : 1.0,
profilesSampleRate: process.env.THREAD_PROFILE_RATE ? parseFloat(process.env.THREAD_PROFILE_RATE) : 1.0, // Profiling sample rate is relative to tracesSampleRate,
_experiments: {
maxProfileDurationMs: 6 * 60 * 1000, // max profiling duration of 6 minutes (technically "beta" feature)
},
});
try {
Sentry.init({
integrations: [
// Automatically instrument Node.js libraries and frameworks
...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations(),
new ProfilingIntegration(),
// enable HTTP calls tracing
new Sentry.Integrations.Http({ tracing: true }),
],
environment: process.env.INSTANCE_ENV,
debug: true,
normalizeDepth: 6,
maxBreadcrumbs: 500,
// Set tracesSampleRate to 1.0 to capture 100%
// of transactions for performance monitoring.
// We recommend adjusting this value in production
tracesSampleRate: process.env.THREAD_SAMPLE_RATE ? parseFloat(process.env.THREAD_SAMPLE_RATE) : 1.0,
profilesSampleRate: process.env.THREAD_PROFILE_RATE ? parseFloat(process.env.THREAD_PROFILE_RATE) : 1.0, // Profiling sample rate is relative to tracesSampleRate,
_experiments: {
maxProfileDurationMs: 6 * 60 * 1000, // max profiling duration of 6 minutes (technically "beta" feature)
},
});
} catch (error) {
debug("Sentry init error. This does not affect execution.");
debug(error);
}

const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) => {
debug(`Worker thread ${threadId} beginning task.`);
debug(`Worker thread ${threadId} beginning ${workerData.queue} task.`);

global.SCHEMA_VERSION = "1.4.0";

Expand All @@ -51,19 +58,39 @@ const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) =>
global.job = await queue.getJob(jobId);
}

const transaction = Sentry.startTransaction({ name: route });
transaction.setData("request", req.data.queryGraph);
Sentry.getCurrentHub().configureScope(scope => {
scope.clearBreadcrumbs();
scope.setSpan(transaction);
});
let transaction;
try {
const routeNames = {
query_v1: "EXEC /v1/query",
query_v1_by_api: "EXEC /v1/smartapi/:/query",
query_v1_by_team: "EXEC /v1/team/:/query",
asyncquery_status: "EXEC /v1/asyncquery_status",
asyncquery_v1: "EXEC /v1/asyncquery",
asyncquery_v1_by_api: "EXEC /v1/smartapi/:/asyncquery",
asyncquery_v1_by_team: "EXEC /v1/team/:/asyncquery",
};
transaction = Sentry.startTransaction({ name: routeNames[route] });
transaction.setData("request", req.data.queryGraph);
Sentry.getCurrentHub().configureScope(scope => {
scope.clearBreadcrumbs();
scope.setSpan(transaction);
});
} catch (error) {
debug("Sentry transaction start error. This does not affect execution.");
debug(error);
}

const completedTask = await tasks[route](req);
await Promise.all(global.cachingTasks);

transaction.finish();
try {
transaction.finish();
} catch (error) {
debug("Sentry transaction finish error. This does not affect execution.");
debug(error);
}

debug(`Worker thread ${threadId} completed task.`);
debug(`Worker thread ${threadId} completed ${workerData.queue} task.`);

return completedTask;
};
Expand Down
46 changes: 28 additions & 18 deletions src/controllers/threading/threadHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,40 @@ const ServerOverloadedError = require("../../utils/errors/server_overloaded_erro
const { customAlphabet } = require("nanoid");
const { getQueryQueue } = require("../async/asyncquery_queue");

const Sentry = require("@sentry/node");
const { Telemetry } = require("@biothings-explorer/utils");
const ErrorHandler = require("../../middlewares/error.js");

const SYNC_MIN_CONCURRENCY = 2;
const ASYNC_MIN_CONCURRENCY = 3;

// On Prod: 0.25 ratio * 16 cores * 4 instances = 16 threads
const SYNC_CONCURRENCY_RATIO = 0.25;
/** On Prod: 0.25 ratio * 16 cores * 4 instances = 16 threads
* Async has 3 queues:
* - general
* - by api
* - by team
*
* Distribution between those is seen below.
* */
const ASYNC_CONCURRENCY_RATIO = 0.25;
// On most instances, there are two nodes, one for Service Provider endpoints and one for everything else
// On Dev and local instances, this isn't the case, so a lower concurrency is needed
const CORE_CONCURRENCY_RATIO = parseInt(process.env.CORE_CONCURRENCY_RATIO ?? 2.5);
const MEM_CONCURRENCY_RATIO = parseFloat(process.env.MEM_CONCURRENCY_RATIO ?? 0.6);

let SYNC_CONCURRENCY = Math.ceil(os.cpus().length * SYNC_CONCURRENCY_RATIO);
const CORE_LIMIT = Math.ceil(os.cpus().length * CORE_CONCURRENCY_RATIO);

const MEM_LIMIT = Math.ceil((os.totalmem() / 1e9) * MEM_CONCURRENCY_RATIO);

// Ex. Prod: 16 cores / 64GB mem = min(16 * 2, 32) = 32 allowed concurrently
// Divided by 4 because each instance uses 4 sub-instances for reliability
let SYNC_CONCURRENCY = Math.ceil(Math.min(CORE_LIMIT, MEM_LIMIT) / 4);
if (SYNC_CONCURRENCY < SYNC_MIN_CONCURRENCY) SYNC_CONCURRENCY = SYNC_MIN_CONCURRENCY;
let ASYNC_CONCURRENCY = Math.ceil(os.cpus().length * ASYNC_CONCURRENCY_RATIO);

let ASYNC_CONCURRENCY = SYNC_CONCURRENCY;
if (ASYNC_CONCURRENCY < ASYNC_MIN_CONCURRENCY) ASYNC_CONCURRENCY = ASYNC_MIN_CONCURRENCY;

const ASYNC_MAIN_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.ceil(ASYNC_CONCURRENCY / 3) : ASYNC_CONCURRENCY - 6;
const ASYNC_BY_API_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 3;
const ASYNC_BY_TEAM_CONCURRENCY = ASYNC_CONCURRENCY <= 9 ? Math.floor(ASYNC_CONCURRENCY / 3) : 3;
// Async has 3 separate queues, concurrency is distributed between them as such:
const ASYNC_MAIN_CONCURRENCY = ASYNC_CONCURRENCY;
const ASYNC_BY_API_CONCURRENCY = Math.ceil(ASYNC_CONCURRENCY / 2);
const ASYNC_BY_TEAM_CONCURRENCY = Math.ceil(ASYNC_CONCURRENCY / 2);

if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "false")) {
// Give user a little report of resource availability
debug(`Computed core limit: ${CORE_LIMIT}`);
debug(`Computed mem limit: ${MEM_LIMIT}`);
debug(`Sync concurrency limit: ${SYNC_CONCURRENCY}`);
debug(`Async concurrency limit: ${ASYNC_CONCURRENCY}`);
const env = {
...process.env,
DEBUG_COLORS: true,
Expand All @@ -56,6 +62,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa
maxThreads: SYNC_CONCURRENCY,
maxQueue: 600,
idleTimeout: 10 * 60 * 1000, // 10 minutes
workerData: { queue: "sync" },
env,
}),
/**Low-volume, high-intensity requests
Expand All @@ -68,6 +75,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa
maxThreads: ASYNC_CONCURRENCY,
minThreads: 1,
idleTimeout: 60 * 60 * 1000, // 1 hour
workerData: { queue: "async" },
env,
}),
/**High-volume, low-intensity requests
Expand All @@ -78,6 +86,7 @@ if (!global.threadpool && !isWorkerThread && !(process.env.USE_THREADING === "fa
idleTimeout: 60 * 60 * 1000, // 1 hour
minThreads: 2,
maxQueue: 600,
workerData: { queue: "misc" },
env,
}),
};
Expand Down Expand Up @@ -308,7 +317,7 @@ function taskResponse(response, status = undefined) {
function taskError(error) {
if (global.parentPort) {
if (ErrorHandler.shouldHandleError(error)) {
Sentry.captureException(error);
Telemetry.captureException(error);
}
global.parentPort.postMessage({ threadId, err: error });
return undefined;
Expand All @@ -330,6 +339,7 @@ if (!global.queryQueue.bte_sync_query_queue && !isWorkerThread) {
}
}

// TODO merge async into one queue
if (!global.queryQueue.bte_query_queue && !isWorkerThread) {
getQueryQueue("bte_query_queue");
if (global.queryQueue.bte_query_queue) {
Expand Down
53 changes: 29 additions & 24 deletions src/middlewares/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,35 @@ class ErrorHandler {

setRoutes(app) {
// first pass through sentry
app.use(
Sentry.Handlers.errorHandler({
shouldHandleError(error) {
// Do not capture non-server errors
if (error.status && error.status < 500) {
return false;
}
if (error instanceof swaggerValidation.InputValidationError || error.name === "InputValidationError") {
return false;
}
if (
error instanceof QueryGraphHandler.InvalidQueryGraphError ||
error.stack.includes("InvalidQueryGraphError") ||
error.name === "InvalidQueryGraphError"
) {
return false;
}
if (error.name === "QueryAborted") {
return false;
}
return true;
},
}),
);
try {
app.use(
Sentry.Handlers.errorHandler({
shouldHandleError(error) {
// Do not capture non-server errors
if (error.status && error.status < 500) {
return false;
}
if (error instanceof swaggerValidation.InputValidationError || error.name === "InputValidationError") {
return false;
}
if (
error instanceof QueryGraphHandler.InvalidQueryGraphError ||
error.stack.includes("InvalidQueryGraphError") ||
error.name === "InvalidQueryGraphError"
) {
return false;
}
if (error.name === "QueryAborted") {
return false;
}
return true;
},
}),
);
} catch (error) {
debug("Sentry express config error. This does not affect execution.");
debug(error);
}

app.use((error, req, res, next) => {
const json = {
Expand Down
Loading

0 comments on commit 082be5d

Please sign in to comment.