diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index d5ac8e95c..abe6d33eb 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -432,7 +432,8 @@ private Tuple2, Iterable> parse(Serve } } else { - throw new McpError("Received unrecognized SSE event type: " + event.event()); + logger.debug("Received SSE event with type: {}", event); + return Tuples.of(Optional.empty(), List.of()); } } diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java index 128cda4c3..59385b54a 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java @@ -216,7 +216,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) { } } else { - s.error(new McpError("Received unrecognized SSE event type: " + event.event())); + logger.debug("Received unrecognized SSE event type: {}", event); + s.complete(); } }).transform(handler)).subscribe(); diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java index 42b91d14e..1cf5dffe2 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java @@ -6,6 +6,7 @@ import java.time.Duration; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -77,6 +78,11 @@ public int getInboundMessageCount() { return inboundMessageCount.get(); } + public void simulateSseComment(String comment) { + events.tryEmitNext(ServerSentEvent.builder().comment(comment).build()); + inboundMessageCount.incrementAndGet(); + } + public void simulateEndpointEvent(String jsonMessage) { events.tryEmitNext(ServerSentEvent.builder().event("endpoint").data(jsonMessage).build()); inboundMessageCount.incrementAndGet(); @@ -158,6 +164,27 @@ void testBuilderPattern() { assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException(); } + @Test + void testCommentSseMessage() { + // If the line starts with a character (:) are comment lins and should be ingored + // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation + + CopyOnWriteArrayList droppedErrors = new CopyOnWriteArrayList<>(); + reactor.core.publisher.Hooks.onErrorDropped(droppedErrors::add); + + try { + // Simulate receiving the SSE comment line + transport.simulateSseComment("sse comment"); + + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + assertThat(droppedErrors).hasSize(0); + } + finally { + reactor.core.publisher.Hooks.resetOnErrorDropped(); + } + } + @Test void testMessageProcessing() { // Create a test message diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index 8598e3164..b610ad93a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -366,10 +366,8 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { return Flux.just(message); } else { - logger.error("Received unrecognized SSE event type: {}", - responseEvent.sseEvent().event()); - sink.error(new McpError( - "Received unrecognized SSE event type: " + responseEvent.sseEvent().event())); + logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent()); + sink.success(); } } catch (IOException e) { diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 12baa1706..d8dd97f1e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -264,6 +264,10 @@ private Mono reconnect(McpTransportStream stream) { "Error parsing JSON-RPC message: " + responseEvent.sseEvent().data())); } } + else { + logger.debug("Received SSE event with type: {}", responseEvent.sseEvent()); + return Flux.empty(); + } } else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed logger.debug("The server does not support SSE streams, using request-response mode.");