-
Notifications
You must be signed in to change notification settings - Fork 73
mcp/streamable: add resumability for the Streamable transport #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
mcp/streamable: add resumability for the Streamable transport #133
Conversation
891161c
to
abc4c9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First preliminary review. With Rob out, I want to really understand this. That may take a day or two.
5a2d5aa
to
9d77544
Compare
mcp/streamable.go
Outdated
@@ -594,12 +597,38 @@ type StreamableClientTransport struct { | |||
opts StreamableClientTransportOptions | |||
} | |||
|
|||
// StreamableClientReconnectionOptions defines parameters for client reconnection attempts. | |||
type StreamableClientReconnectionOptions struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should export only MaxRetries. For the others we can pick reasonable defaults that will probably work forever, or a long time. WCAAIL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Removed the growFactor check since it's unexported now
@@ -704,11 +760,19 @@ func (s *streamableClientConn) Write(ctx context.Context, msg jsonrpc.Message) e | |||
if sessionID == "" { | |||
// locked | |||
s._sessionID = gotSessionID | |||
// With the session now established, launch the persistent background listener for server-pushed events. | |||
go s.establishSSE(&startSSEState{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goroutine is not present in the current code. Why is it needed here?
More general question: is there a description of the design of resumability somewhere? Not looking for a design doc, just a short sketch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the previous logic only covered section 2.1
Which established an SSE stream if the server responds to a client POST with a text/event-stream.
However in section 2.2, it says: The client MAY issue an HTTP GET to the MCP endpoint. This can be used to open an SSE stream, allowing the server to communicate to the client, without the client first sending data via HTTP POST
So from my understanding, we need to proactively issue a client GET request after initialization to see if the server has any SSE streams waiting before a client POST.
Added a comment.
defer resp.Body.Close() | ||
|
||
done := make(chan struct{}) | ||
go func() { | ||
defer close(done) | ||
for evt, err := range scanEvents(resp.Body) { | ||
if err != nil { | ||
// TODO: surface this error; possibly break the stream | ||
s.scheduleReconnect(opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not every error is going to be worth reconnecting for, is it? There could be bugs or bad data. We'll probably need a way to distinguish these. For a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll follow-up with another CL after this one.
mcp/streamable.go
Outdated
|
||
if !strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") { | ||
resp.Body.Close() | ||
s.scheduleReconnect(opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we think reconnecting will help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I don't think it will, good point. Updated
return | ||
} | ||
|
||
s.handleSSE(resp, opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You started a goroutine with this in postMessage. Why are you calling it again here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the client GET initiated stream.
I think there are 2 flows to start an SSE stream:
- client GET request (here)
- client POST request (postMessage)
This CL implements a retry mechanism to resume SSE streams to recover from network failures.
9d77544
to
881274b
Compare
return | ||
case <-time.After(delay): | ||
opts.attempt++ | ||
s.establishSSE(opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a recursive call to establishSSE, so in theory you could blow the stack.
Not sure what the right fix is.
This CL implements a retry mechanism to resume SSE streams to recover from network failures.
For #10
I referenced