Skip to content

Commit

Permalink
feat(McpAsyncServer): Add non-blocking execution for tools and resources
Browse files Browse the repository at this point in the history
- Execute tool calls, resource reads and prompt handling in a non-blocking
  manner using Schedulers.boundedElastic(). This prevents blocking operations
  from impacting server responsiveness.
- Added integration tests to verify non-blocking behavior with tools that
  make HTTP calls to external services.

Related to #48
This is  a temp patch until #48 is resolved properly.
  • Loading branch information
tzolov committed Jan 4, 2025
1 parent b2082a0 commit 580f955
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 15 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ Add the following dependencies to your Maven project:
<dependency>
<groupId>org.springframework.experimental</groupId>
<artifactId>mcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>

<!-- For Spring AI integration -->
<dependency>
<groupId>org.springframework.experimental</groupId>
<artifactId>spring-ai-mcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion mcp-docs/src/main/antora/modules/ROOT/pages/mcp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Add the following dependency to your Maven project:
<dependency>
<groupId>org.springframework.experimental</groupId>
<artifactId>mcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
----

Expand Down
4 changes: 2 additions & 2 deletions mcp-docs/src/main/antora/modules/ROOT/pages/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Maven::
<dependency>
<groupId>org.springframework.experimental</groupId>
<artifactId>mcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
----
+
Expand All @@ -50,7 +50,7 @@ Maven::
<dependency>
<groupId>org.springframework.experimental</groupId>
<artifactId>spring-ai-mcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
----
+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ To use this module, add the following dependency to your Maven project:
<dependency>
<groupId>org.springframework.experimental</groupId>
<artifactId>spring-ai-mcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
----
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ private DefaultMcpSession.RequestHandler toolsCallRequestHandler() {
return Mono.<Object>error(new McpError("Tool not found: " + callToolRequest.name()));
}

CallToolResult callResponse = toolRegistration.get().call().apply(callToolRequest.arguments());

return Mono.just(callResponse);
return Mono.fromCallable(() -> toolRegistration.get().call().apply(callToolRequest.arguments()))
.map(result -> (Object) result)
.subscribeOn(Schedulers.boundedElastic());
};
}

Expand Down Expand Up @@ -462,7 +462,9 @@ private DefaultMcpSession.RequestHandler resourcesReadRequestHandler() {
});
var resourceUri = resourceRequest.uri();
if (this.resources.containsKey(resourceUri)) {
return Mono.just(this.resources.get(resourceUri).readHandler().apply(resourceRequest));
return Mono.fromCallable(() -> this.resources.get(resourceUri).readHandler().apply(resourceRequest))
.map(result -> (Object) result)
.subscribeOn(Schedulers.boundedElastic());
}
return Mono.error(new McpError("Resource not found: " + resourceUri));
};
Expand Down Expand Up @@ -558,7 +560,10 @@ private DefaultMcpSession.RequestHandler promptsGetRequestHandler() {

// Implement prompt retrieval logic here
if (this.prompts.containsKey(promptRequest.name())) {
return Mono.just(this.prompts.get(promptRequest.name()).promptHandler().apply(promptRequest));
return Mono
.fromCallable(() -> this.prompts.get(promptRequest.name()).promptHandler().apply(promptRequest))
.map(result -> (Object) result)
.subscribeOn(Schedulers.boundedElastic());
}

return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,22 @@

import org.springframework.ai.mcp.client.McpClient;
import org.springframework.ai.mcp.client.transport.SseClientTransport;
import org.springframework.ai.mcp.server.McpServer.ToolRegistration;
import org.springframework.ai.mcp.server.transport.SseServerTransport;
import org.springframework.ai.mcp.spec.McpError;
import org.springframework.ai.mcp.spec.McpSchema;
import org.springframework.ai.mcp.spec.McpSchema.CallToolResult;
import org.springframework.ai.mcp.spec.McpSchema.ClientCapabilities;
import org.springframework.ai.mcp.spec.McpSchema.CreateMessageRequest;
import org.springframework.ai.mcp.spec.McpSchema.CreateMessageResult;
import org.springframework.ai.mcp.spec.McpSchema.InitializeResult;
import org.springframework.ai.mcp.spec.McpSchema.Role;
import org.springframework.ai.mcp.spec.McpSchema.Root;
import org.springframework.ai.mcp.spec.McpSchema.ServerCapabilities;
import org.springframework.ai.mcp.spec.McpSchema.Tool;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.client.RestClient;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunctions;

Expand Down Expand Up @@ -314,4 +319,111 @@ void testRootsServerCloseWithActiveSubscription() {
mcpClient.close();
}

// ---------------------------------------
// Tools Tests
// ---------------------------------------
@Test
void testToolCallSuccess() {

var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
ToolRegistration tool1 = new ToolRegistration(
new McpSchema.Tool("tool1", "tool1 description", Map.of("city", "String")), request -> {
// perform a blocking call to a remote service
String response = RestClient.create()
.get()
.uri("https://github.com/spring-projects-experimental/spring-ai-mcp/blob/main/README.md")
.retrieve()
.body(String.class);
assertThat(response).isNotBlank();
return callResponse;
});

var mcpServer = McpServer.using(mcpServerTransport)
.capabilities(ServerCapabilities.builder().tools(true).build())
.tools(tool1)
.sync();

var mcpClient = clientBuilder.sync();

InitializeResult initResult = mcpClient.initialize();
assertThat(initResult).isNotNull();

assertThat(mcpClient.listTools().tools()).contains(tool1.tool());

CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));

assertThat(response).isNotNull();
assertThat(response).isEqualTo(callResponse);

mcpClient.close();
mcpServer.close();
}

@Test
void testToolListChangeHandlingSuccess() {

var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
ToolRegistration tool1 = new ToolRegistration(
new McpSchema.Tool("tool1", "tool1 description", Map.of("city", "String")), request -> {
// perform a blocking call to a remote service
String response = RestClient.create()
.get()
.uri("https://github.com/spring-projects-experimental/spring-ai-mcp/blob/main/README.md")
.retrieve()
.body(String.class);
assertThat(response).isNotBlank();
return callResponse;
});

var mcpServer = McpServer.using(mcpServerTransport)
.capabilities(ServerCapabilities.builder().tools(true).build())
.tools(tool1)
.sync();

AtomicReference<List<Tool>> rootsRef = new AtomicReference<>();
var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> {
// perform a blocking call to a remote service
String response = RestClient.create()
.get()
.uri("https://github.com/spring-projects-experimental/spring-ai-mcp/blob/main/README.md")
.retrieve()
.body(String.class);
assertThat(response).isNotBlank();
rootsRef.set(toolsUpdate);
}).sync();

InitializeResult initResult = mcpClient.initialize();
assertThat(initResult).isNotNull();

assertThat(rootsRef.get()).isNull();

assertThat(mcpClient.listTools().tools()).contains(tool1.tool());

mcpServer.notifyToolsListChanged();

await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
assertThat(rootsRef.get()).containsAll(List.of(tool1.tool()));
});

// Remove a tool
mcpServer.removeTool("tool1");

await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
assertThat(rootsRef.get()).isEmpty();
});

// Add a new tool
ToolRegistration tool2 = new ToolRegistration(
new McpSchema.Tool("tool2", "tool2 description", Map.of("city", "String")), request -> callResponse);

mcpServer.addTool(tool2);

await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
assertThat(rootsRef.get()).containsAll(List.of(tool2.tool()));
});

mcpClient.close();
mcpServer.close();
}

}
6 changes: 3 additions & 3 deletions spring-ai-mcp-sample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The server can be started in two transport modes, controlled by the `transport.m
### Stdio Mode (Default)

```bash
java -Dtransport.mode=stdio -jar target/spring-ai-mcp-sample-0.4.0-SNAPSHOT.jar
java -Dtransport.mode=stdio -jar target/spring-ai-mcp-sample-0.5.0-SNAPSHOT.jar
```

The Stdio mode server is automatically started by the client - no explicit server startup is needed.
Expand All @@ -38,7 +38,7 @@ In Stdio mode the server must not emit any messages/logs to the console (e.g. st

### SSE Mode
```bash
java -Dtransport.mode=sse -jar target/spring-ai-mcp-sample-0.4.0-SNAPSHOT.jar
java -Dtransport.mode=sse -jar target/spring-ai-mcp-sample-0.5.0-SNAPSHOT.jar
```

## Sample Clients
Expand All @@ -49,7 +49,7 @@ The project includes example clients for both transport modes:
```java
var stdioParams = ServerParameters.builder("java")
.args("-Dtransport.mode=stdio", "-jar",
"target/spring-ai-mcp-sample-0.4.0-SNAPSHOT.jar")
"target/spring-ai-mcp-sample-0.5.0-SNAPSHOT.jar")
.build();

var transport = new StdioClientTransport(stdioParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static void main(String[] args) {

var stdioParams = ServerParameters.builder("java")
.args("-Dtransport.mode=stdio", "-jar",
"spring-ai-mcp-sample/target/spring-ai-mcp-sample-0.4.0-SNAPSHOT.jar")
"spring-ai-mcp-sample/target/spring-ai-mcp-sample-0.5.0-SNAPSHOT.jar")
.build();

var transport = new StdioClientTransport(stdioParams);
Expand Down

0 comments on commit 580f955

Please sign in to comment.