-
Notifications
You must be signed in to change notification settings - Fork 78
feat: add streamable http [MCP-42] #361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pull Request Test Coverage Report for Build 16422846497Details
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces streamable HTTP transport functionality to the MongoDB MCP Server, allowing it to run as an HTTP server in addition to the existing stdio transport. The implementation includes session management with timeout handling and configurable logging options.
- Adds StreamableHttpRunner with Express-based HTTP server and session management
- Refactors transport layer with base class and configurable runner selection
- Introduces comprehensive timeout management for HTTP sessions with cleanup
Reviewed Changes
Copilot reviewed 24 out of 25 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
src/transports/streamableHttp.ts | New HTTP transport implementation with session management and Express server |
src/transports/stdio.ts | Refactored stdio transport into runner pattern with base class |
src/transports/base.ts | Abstract base class for transport runners |
src/common/timeoutManager.ts | New timeout management utility for session cleanup |
src/common/sessionStore.ts | HTTP session storage and lifecycle management |
src/common/config.ts | Enhanced configuration with HTTP and logging options |
src/server.ts | Refactored server initialization and validation logic |
src/index.ts | Simplified main entry point with transport runner selection |
tests/integration/transports/streamableHttp.test.ts | Integration tests for HTTP transport |
tests/integration/transports/stdio.test.ts | Integration tests for stdio transport |
tests/unit/common/timeoutManager.test.ts | Unit tests for timeout manager |
src/common/timeoutManager.ts
Outdated
if (this.callback) { | ||
try { | ||
await this.callback(); | ||
} catch (error: unknown) { | ||
this.onerror?.(error); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runCallback method checks if 'this.callback' exists, but the callback is marked as 'readonly' and provided in the constructor, so this check is unnecessary and adds complexity.
if (this.callback) { | |
try { | |
await this.callback(); | |
} catch (error: unknown) { | |
this.onerror?.(error); | |
} | |
try { | |
await this.callback(); | |
} catch (error: unknown) { | |
this.onerror?.(error); |
Copilot uses AI. Check for mistakes.
src/common/sessionStore.ts
Outdated
const logger = new McpLogger(session.mcpServer); | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new McpLogger instance for each notification is inefficient. Consider creating the logger once and reusing it, or passing it as a dependency.
const logger = new McpLogger(session.mcpServer); | |
logger.info( | |
session.logger.info( |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, we should create a logger and store it inside the our session map
src/common/sessionStore.ts
Outdated
const logger = new McpLogger(mcpServer); | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new McpLogger instance inside the timeout callback is inefficient and could be problematic if called multiple times. Consider creating the logger once and reusing it.
const logger = new McpLogger(mcpServer); | |
logger.info( | |
this.sessions[sessionId].logger.info( |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to be focused on some technical writing today but just some initial review on first part of the changes
src/common/sessionStore.ts
Outdated
const logger = new McpLogger(session.mcpServer); | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, we should create a logger and store it inside the our session map
src/common/sessionStore.ts
Outdated
const logger = new McpLogger(mcpServer); | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
src/common/timeoutManager.ts
Outdated
* Resets the timeout. | ||
*/ | ||
reset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Resets the timeout. | |
*/ | |
reset() { | |
* Restarts the timeout. | |
*/ | |
restart() { |
Just to better differentiate from clear
as it's not clear that the initial state has the timeout started already
src/common/timeoutManager.ts
Outdated
* Runs the callback function. | ||
*/ | ||
private async runCallback() { | ||
if (this.callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.callback
is always defined right? this is not needed
src/common/timeoutManager.ts
Outdated
if (timeoutMS <= 0) { | ||
throw new Error("timeoutMS must be greater than 0"); | ||
} | ||
this.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not have side-effects in constructors. I think this whole helper can be better represented as a single function.
type ManagedTimeout = {
cancel: () => void;
restart: () => void;
};
export function setManagedTimeout(
callback: () => Promise<void> | void,
timeoutMS: number,
{ onError }: { onError?: (error: unknown) => void }
) {
const wrappedCallback = () => {
try {
await this.callback();
} catch (error: unknown) {
onError?.(error);
}
};
let timeout = setTimeout(wrappedCallback, timeoutMS);
const cancel = () => {
cancelTimeout(timeout);
timeout = undefined;
};
const restart = () => {
cancel();
timeout = setTimeout(wrappedCallback, timeoutMS);
}
return {
cancel,
restart
};
}
@@ -186,6 +189,35 @@ export class Server { | |||
} | |||
|
|||
private async validateConfig(): Promise<void> { | |||
const transport = this.userConfig.transport as string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these type casts needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the way I've found to validate it.
The problem here is as far as we are concerned, the possible values are predefined, so the literal types make sense, but the inputs are cli args or env vars, which are basically strings that can contain any values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah okay I see
if (transport !== "http" && transport !== "stdio") {
throw new Error(`Invalid transport: ${String(transport)}`);
}
could prevent it but yeah honestly isn't much better.
We can also use zod
for assertions here. We can define a schema for the config and just let it assert, it'd do the error messages well. But can be done later too
@gagik I've merged the PR according to your comments, would take another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, nice work. Just a couple questions and structure / style ideas.
@@ -66,15 +67,17 @@ export class Server { | |||
return existingHandler(request, extra); | |||
}); | |||
|
|||
const containerEnv = await detectContainerEnv(); | |||
|
|||
if (containerEnv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we not need to do this setting anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, for our telemetry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant I don't see it in the new code, is this being done somewhere else?
@@ -186,6 +189,35 @@ export class Server { | |||
} | |||
|
|||
private async validateConfig(): Promise<void> { | |||
const transport = this.userConfig.transport as string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah okay I see
if (transport !== "http" && transport !== "stdio") {
throw new Error(`Invalid transport: ${String(transport)}`);
}
could prevent it but yeah honestly isn't much better.
We can also use zod
for assertions here. We can define a schema for the config and just let it assert, it'd do the error messages well. But can be done later too
return (req: express.Request, res: express.Response, next: express.NextFunction) => { | ||
fn(req, res, next).catch((error) => { | ||
logger.error( | ||
LogId.streamableHttpTransportRequestFailure, | ||
"streamableHttpTransport", | ||
`Error handling request: ${error instanceof Error ? error.message : String(error)}` | ||
); | ||
res.status(400).json({ | ||
jsonrpc: "2.0", | ||
error: { | ||
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED, | ||
message: `failed to handle request`, | ||
data: error instanceof Error ? error.message : String(error), | ||
}, | ||
}); | ||
}); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (req: express.Request, res: express.Response, next: express.NextFunction) => { | |
fn(req, res, next).catch((error) => { | |
logger.error( | |
LogId.streamableHttpTransportRequestFailure, | |
"streamableHttpTransport", | |
`Error handling request: ${error instanceof Error ? error.message : String(error)}` | |
); | |
res.status(400).json({ | |
jsonrpc: "2.0", | |
error: { | |
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED, | |
message: `failed to handle request`, | |
data: error instanceof Error ? error.message : String(error), | |
}, | |
}); | |
}); | |
}; | |
return async (req: express.Request, res: express.Response, next: express.NextFunction) { | |
try { | |
await fn(req, res, next); | |
} catch (error) { | |
logger.error( | |
LogId.streamableHttpTransportRequestFailure, | |
"streamableHttpTransport", | |
`Error handling request: ${error instanceof Error ? error.message : String(error)}` | |
); | |
res.status(400).json({ | |
jsonrpc: "2.0", | |
error: { | |
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED, | |
message: `failed to handle request`, | |
data: error instanceof Error ? error.message : String(error), | |
}, | |
}); | |
}; | |
}; |
nit: just prefer async syntax wherever possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is that Express.js does not work well with async middlewares. This is about making sure errors are still caught in Express.js.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this is still an async middleware right? it returns a promise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does not return a promise it acts as a callback, if fn()
which returns a promise fails I use .catch
to treat it.
if I change to async function ...
there is a chance of it throwing and we never respond.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in essence, this converts promise back to callback to make Express.js
happy
const JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND = -32003; | ||
const JSON_RPC_ERROR_CODE_INVALID_REQUEST = -32004; | ||
|
||
function promiseHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function promiseHandler( | |
function withErrorHandling( |
nit: I assume that's the purpose of it?
app.enable("trust proxy"); // needed for reverse proxy support | ||
app.use(express.json()); | ||
|
||
const handleRequest = async (req: express.Request, res: express.Response) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const handleRequest = async (req: express.Request, res: express.Response) => { | |
const handleSessionRequest = async (req: express.Request, res: express.Response) => { |
just to make it clearer this is for session-specific requests.
maybe we should move this to a private class method along with the general request handler?
const sessionId = req.headers["mcp-session-id"]; | ||
if (sessionId) { | ||
await handleRequest(req, res); | ||
return; | ||
} | ||
|
||
if (!isInitializeRequest(req.body)) { | ||
res.status(400).json({ | ||
jsonrpc: "2.0", | ||
error: { | ||
code: JSON_RPC_ERROR_CODE_INVALID_REQUEST, | ||
message: `invalid request`, | ||
}, | ||
}); | ||
return; | ||
} | ||
|
||
const server = this.setupServer(this.userConfig); | ||
const transport = new StreamableHTTPServerTransport({ | ||
sessionIdGenerator: () => randomUUID().toString(), | ||
onsessioninitialized: (sessionId) => { | ||
this.sessionStore.setSession(sessionId, transport, server.mcpServer); | ||
}, | ||
onsessionclosed: async (sessionId) => { | ||
try { | ||
await this.sessionStore.closeSession(sessionId, false); | ||
} catch (error) { | ||
logger.error( | ||
LogId.streamableHttpTransportSessionCloseFailure, | ||
"streamableHttpTransport", | ||
`Error closing session: ${error instanceof Error ? error.message : String(error)}` | ||
); | ||
} | ||
}, | ||
}); | ||
|
||
transport.onclose = () => { | ||
server.close().catch((error) => { | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe worth moving this to a private class method? not sure what would be a good name but something to differnetiate from handleSessionRequest
for example. Just to make this start function define the paths and individual methods to handle them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I follow? you want to move the logger.error
or server.close
? I want the close of http transport to trigger the server to stop listening on port and gracefully shutdown if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah no this isn't about this line, I just meant we can consider moving the handler into its own named function as well to make the start
function not nest too much. purely stylistic nit. I don't mind it like this either
@gagik I'm merging as is and will address points in a follow up pr |
Proposed changes
Introduces Streamable HTTP transport; No authentication or authorization was implemented.
Note: this is a feature branch, every single individual change was approved on each PR.
Checklist