Skip to content

feat: implement MCP-compliant keep-alive functionality for server transports #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.modelcontextprotocol.server.transport;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;

import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -11,6 +12,8 @@
import io.modelcontextprotocol.spec.McpServerTransport;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.KeepAliveScheduler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -109,6 +112,12 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
*/
private volatile boolean isClosing = false;

/**
* Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
* set. Disabled by default.
*/
private KeepAliveScheduler keepAliveScheduler;

/**
* Constructs a new WebFlux SSE server transport provider instance with the default
* SSE endpoint.
Expand All @@ -118,7 +127,10 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
* messages. This endpoint will be communicated to clients during SSE connection
* setup. Must not be null.
* @throws IllegalArgumentException if either parameter is null
* @deprecated Use the builder {@link #builder()} instead for better configuration
* options.
*/
@Deprecated
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
}
Expand All @@ -131,7 +143,10 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
* messages. This endpoint will be communicated to clients during SSE connection
* setup. Must not be null.
* @throws IllegalArgumentException if either parameter is null
* @deprecated Use the builder {@link #builder()} instead for better configuration
* options.
*/
@Deprecated
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
this(objectMapper, DEFAULT_BASE_URL, messageEndpoint, sseEndpoint);
}
Expand All @@ -145,9 +160,32 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
* messages. This endpoint will be communicated to clients during SSE connection
* setup. Must not be null.
* @throws IllegalArgumentException if either parameter is null
* @deprecated Use the builder {@link #builder()} instead for better configuration
* options.
*/
@Deprecated
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
String sseEndpoint) {
this(objectMapper, baseUrl, messageEndpoint, sseEndpoint, null);
}

/**
* Constructs a new WebFlux SSE server transport provider instance.
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
* of MCP messages. Must not be null.
* @param baseUrl webflux message base path
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
* messages. This endpoint will be communicated to clients during SSE connection
* setup. Must not be null.
* @param sseEndpoint The SSE endpoint path. Must not be null.
* @param keepAliveInterval The interval for sending keep-alive pings to clients.
* @throws IllegalArgumentException if either parameter is null
* @deprecated Use the builder {@link #builder()} instead for better configuration
* options.
*/
@Deprecated
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
String sseEndpoint, Duration keepAliveInterval) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(baseUrl, "Message base path must not be null");
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
Expand All @@ -161,6 +199,17 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseU
.GET(this.sseEndpoint, this::handleSseConnection)
.POST(this.messageEndpoint, this::handleMessage)
.build();

if (keepAliveInterval != null) {

this.keepAliveScheduler = KeepAliveScheduler
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
.initialDelay(keepAliveInterval)
.interval(keepAliveInterval)
.build();

this.keepAliveScheduler.start();
}
}

@Override
Expand Down Expand Up @@ -209,23 +258,21 @@ public Mono<Void> notifyClients(String method, Object params) {
/**
* Initiates a graceful shutdown of all the sessions. This method ensures all active
* sessions are properly closed and cleaned up.
*
* <p>
* The shutdown process:
* <ul>
* <li>Marks the transport as closing to prevent new connections</li>
* <li>Closes each active session</li>
* <li>Removes closed sessions from the sessions map</li>
* <li>Times out after 5 seconds if shutdown takes too long</li>
* </ul>
* @return A Mono that completes when all sessions have been closed
*/
@Override
public Mono<Void> closeGracefully() {
return Flux.fromIterable(sessions.values())
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
.flatMap(McpServerSession::closeGracefully)
.then();
.then()
.doOnSuccess(v -> {
logger.debug("Graceful shutdown completed");
sessions.clear();
if (this.keepAliveScheduler != null) {
this.keepAliveScheduler.shutdown();
}
});
}

/**
Expand Down Expand Up @@ -396,6 +443,8 @@ public static class Builder {

private String sseEndpoint = DEFAULT_SSE_ENDPOINT;

private Duration keepAliveInterval;

/**
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
* messages.
Expand Down Expand Up @@ -446,6 +495,17 @@ public Builder sseEndpoint(String sseEndpoint) {
return this;
}

/**
* Sets the interval for sending keep-alive pings to clients.
* @param keepAliveInterval The keep-alive interval duration. If null, keep-alive
* is disabled.
* @return this builder instance
*/
public Builder keepAliveInterval(Duration keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
return this;
}

/**
* Builds a new instance of {@link WebFluxSseServerTransportProvider} with the
* configured settings.
Expand All @@ -456,7 +516,8 @@ public WebFluxSseServerTransportProvider build() {
Assert.notNull(objectMapper, "ObjectMapper must be set");
Assert.notNull(messageEndpoint, "Message endpoint must be set");

return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint);
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint,
keepAliveInterval);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
import io.modelcontextprotocol.server.McpTransportContext;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.KeepAliveScheduler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
Expand All @@ -28,6 +30,7 @@
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -58,8 +61,11 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe

private volatile boolean isClosing = false;

private KeepAliveScheduler keepAliveScheduler;

private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint,
McpTransportContextExtractor<ServerRequest> contextExtractor, boolean disallowDelete) {
McpTransportContextExtractor<ServerRequest> contextExtractor, boolean disallowDelete,
Duration keepAliveInterval) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(mcpEndpoint, "Message endpoint must not be null");
Assert.notNull(contextExtractor, "Context extractor must not be null");
Expand All @@ -73,6 +79,20 @@ private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, Stri
.POST(this.mcpEndpoint, this::handlePost)
.DELETE(this.mcpEndpoint, this::handleDelete)
.build();

if (keepAliveInterval != null) {
this.keepAliveScheduler = KeepAliveScheduler
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(this.sessions.values()))
.initialDelay(keepAliveInterval)
.interval(keepAliveInterval)
.build();

this.keepAliveScheduler.start();
}
else {
logger.warn("Keep-alive interval is not set or invalid. No keep-alive will be scheduled.");
}

}

@Override
Expand Down Expand Up @@ -105,6 +125,11 @@ public Mono<Void> closeGracefully() {
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
.flatMap(McpStreamableServerSession::closeGracefully)
.then();
}).then().doOnSuccess(v -> {
sessions.clear();
if (this.keepAliveScheduler != null) {
this.keepAliveScheduler.shutdown();
}
});
}

Expand Down Expand Up @@ -368,6 +393,8 @@ public static class Builder {

private boolean disallowDelete;

private Duration keepAliveInterval;

private Builder() {
// used by a static method
}
Expand Down Expand Up @@ -424,6 +451,17 @@ public Builder disallowDelete(boolean disallowDelete) {
return this;
}

/**
* Sets the keep-alive interval for the server transport.
* @param keepAliveInterval The interval for sending keep-alive messages. If null,
* no keep-alive will be scheduled.
* @return this builder instance
*/
public Builder keepAliveInterval(Duration keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
return this;
}

/**
* Builds a new instance of {@link WebFluxStreamableServerTransportProvider} with
* the configured settings.
Expand All @@ -435,7 +473,7 @@ public WebFluxStreamableServerTransportProvider build() {
Assert.notNull(mcpEndpoint, "Message endpoint must be set");

return new WebFluxStreamableServerTransportProvider(objectMapper, mcpEndpoint, contextExtractor,
disallowDelete);
disallowDelete, keepAliveInterval);
}

}
Expand Down
Loading
Loading