Skip to content

Commit 491f03c

Browse files
committed
feat: add support for JSON-RPC batch requests and responses
Signed-off-by: jitokim <pigberger70@gmail.com>
1 parent 866732c commit 491f03c

File tree

6 files changed

+301
-11
lines changed

6 files changed

+301
-11
lines changed

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

+20
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*
3535
* @author Christian Tzolov
3636
* @author Dariusz Jędrzejczyk
37+
* @author Jihoon Kim
3738
*/
3839
public class McpClientSession implements McpSession {
3940

@@ -136,6 +137,14 @@ private void handle(McpSchema.JSONRPCMessage message) {
136137
sink.success(response);
137138
}
138139
}
140+
else if (message instanceof McpSchema.JSONRPCBatchResponse batchResponse) {
141+
logger.debug("Received Batch Response: {}", batchResponse);
142+
batchResponse.responses().forEach(jsonrpcMessage -> {
143+
if (jsonrpcMessage instanceof McpSchema.JSONRPCResponse response) {
144+
this.handle(response);
145+
}
146+
});
147+
}
139148
else if (message instanceof McpSchema.JSONRPCRequest request) {
140149
logger.debug("Received request: {}", request);
141150
handleIncomingRequest(request).onErrorResume(error -> {
@@ -145,6 +154,17 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
145154
return this.transport.sendMessage(errorResponse).then(Mono.empty());
146155
}).flatMap(this.transport::sendMessage).subscribe();
147156
}
157+
else if (message instanceof McpSchema.JSONRPCBatchRequest batchRequest) {
158+
logger.debug("Received Batch Request: {}", batchRequest);
159+
batchRequest.messages().forEach(jsonrpcMessage -> {
160+
if (jsonrpcMessage instanceof McpSchema.JSONRPCRequest request) {
161+
this.handle(request);
162+
}
163+
else if (jsonrpcMessage instanceof McpSchema.JSONRPCNotification notification) {
164+
this.handle(notification);
165+
}
166+
});
167+
}
148168
else if (message instanceof McpSchema.JSONRPCNotification notification) {
149169
logger.debug("Received notification: {}", notification);
150170
handleIncomingNotification(notification)

mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java

+69-11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.fasterxml.jackson.annotation.JsonTypeInfo;
1818
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
1919
import com.fasterxml.jackson.core.type.TypeReference;
20+
import com.fasterxml.jackson.databind.JsonNode;
2021
import com.fasterxml.jackson.databind.ObjectMapper;
2122
import io.modelcontextprotocol.util.Assert;
2223
import org.slf4j.Logger;
@@ -29,6 +30,7 @@
2930
* Context Protocol Schema</a>.
3031
*
3132
* @author Christian Tzolov
33+
* @author Jihoon Kim
3234
*/
3335
public final class McpSchema {
3436

@@ -140,42 +142,78 @@ public sealed interface Request
140142
};
141143

142144
/**
143-
* Deserializes a JSON string into a JSONRPCMessage object.
145+
* Deserializes a JSON string into a JSONRPCMessage object. Handles both single and
146+
* batch JSON-RPC messages.
144147
* @param objectMapper The ObjectMapper instance to use for deserialization
145148
* @param jsonText The JSON string to deserialize
146-
* @return A JSONRPCMessage instance using either the {@link JSONRPCRequest},
147-
* {@link JSONRPCNotification}, or {@link JSONRPCResponse} classes.
149+
* @return A JSONRPCMessage instance, either a {@link JSONRPCRequest},
150+
* {@link JSONRPCNotification}, {@link JSONRPCResponse}, or
151+
* {@link JSONRPCBatchRequest}, or {@link JSONRPCBatchResponse} based on the JSON
152+
* structure.
148153
* @throws IOException If there's an error during deserialization
149154
* @throws IllegalArgumentException If the JSON structure doesn't match any known
150155
* message type
151156
*/
152157
public static JSONRPCMessage deserializeJsonRpcMessage(ObjectMapper objectMapper, String jsonText)
153158
throws IOException {
154-
155159
logger.debug("Received JSON message: {}", jsonText);
156160

157-
var map = objectMapper.readValue(jsonText, MAP_TYPE_REF);
161+
JsonNode rootNode = objectMapper.readTree(jsonText);
162+
163+
// Check if it's a batch request/response
164+
if (rootNode.isArray()) {
165+
// Batch processing
166+
List<JSONRPCMessage> messages = new ArrayList<>();
167+
for (JsonNode node : rootNode) {
168+
Map<String, Object> map = objectMapper.convertValue(node, MAP_TYPE_REF);
169+
messages.add(convertToJsonRpcMessage(map, objectMapper));
170+
}
158171

159-
// Determine message type based on specific JSON structure
172+
// If it's a batch response, return JSONRPCBatchResponse
173+
if (messages.get(0) instanceof JSONRPCResponse) {
174+
return new JSONRPCBatchResponse(messages);
175+
}
176+
// If it's a batch request, return JSONRPCBatchRequest
177+
else {
178+
return new JSONRPCBatchRequest(messages);
179+
}
180+
}
181+
182+
// Single message processing
183+
Map<String, Object> map = objectMapper.readValue(jsonText, MAP_TYPE_REF);
184+
return convertToJsonRpcMessage(map, objectMapper);
185+
}
186+
187+
/**
188+
* Converts a map into a specific JSON-RPC message type. Based on the map's structure,
189+
* this method determines whether the message is a {@link JSONRPCRequest},
190+
* {@link JSONRPCNotification}, or {@link JSONRPCResponse}.
191+
* @param map The map representing the JSON structure
192+
* @param objectMapper The ObjectMapper instance to use for deserialization
193+
* @return The corresponding JSONRPCMessage instance (could be {@link JSONRPCRequest},
194+
* {@link JSONRPCNotification}, or {@link JSONRPCResponse})
195+
* @throws IllegalArgumentException If the map structure doesn't match any known
196+
* message type
197+
*/
198+
private static JSONRPCMessage convertToJsonRpcMessage(Map<String, Object> map, ObjectMapper objectMapper) {
160199
if (map.containsKey("method") && map.containsKey("id")) {
161200
return objectMapper.convertValue(map, JSONRPCRequest.class);
162201
}
163-
else if (map.containsKey("method") && !map.containsKey("id")) {
202+
else if (map.containsKey("method")) {
164203
return objectMapper.convertValue(map, JSONRPCNotification.class);
165204
}
166205
else if (map.containsKey("result") || map.containsKey("error")) {
167206
return objectMapper.convertValue(map, JSONRPCResponse.class);
168207
}
169208

170-
throw new IllegalArgumentException("Cannot deserialize JSONRPCMessage: " + jsonText);
209+
throw new IllegalArgumentException("Unknown JSON-RPC message type: " + map);
171210
}
172211

173212
// ---------------------------
174213
// JSON-RPC Message Types
175214
// ---------------------------
176-
public sealed interface JSONRPCMessage permits JSONRPCRequest, JSONRPCNotification, JSONRPCResponse {
177-
178-
String jsonrpc();
215+
public sealed interface JSONRPCMessage
216+
permits JSONRPCRequest, JSONRPCBatchRequest, JSONRPCNotification, JSONRPCResponse, JSONRPCBatchResponse {
179217

180218
}
181219

@@ -188,6 +226,26 @@ public record JSONRPCRequest( // @formatter:off
188226
@JsonProperty("params") Object params) implements JSONRPCMessage {
189227
} // @formatter:on
190228

229+
public record JSONRPCBatchRequest(List<JSONRPCMessage> messages) implements JSONRPCMessage {
230+
public JSONRPCBatchRequest {
231+
boolean valid = messages.stream()
232+
.allMatch(message -> message instanceof JSONRPCRequest || message instanceof JSONRPCNotification);
233+
if (!valid) {
234+
throw new IllegalArgumentException(
235+
"Only JSONRPCRequest or JSONRPCNotification are allowed in batch request.");
236+
}
237+
}
238+
}
239+
240+
public record JSONRPCBatchResponse(List<JSONRPCMessage> responses) implements JSONRPCMessage {
241+
public JSONRPCBatchResponse {
242+
boolean valid = responses.stream().allMatch(response -> response instanceof JSONRPCResponse);
243+
if (!valid) {
244+
throw new IllegalArgumentException("Only JSONRPCResponse are allowed in batch response.");
245+
}
246+
}
247+
}
248+
191249
@JsonInclude(JsonInclude.Include.NON_ABSENT)
192250
@JsonIgnoreProperties(ignoreUnknown = true)
193251
public record JSONRPCNotification( // @formatter:off

mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java

+23
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.modelcontextprotocol.server.McpAsyncServerExchange;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
14+
import reactor.core.publisher.Flux;
1415
import reactor.core.publisher.Mono;
1516
import reactor.core.publisher.MonoSink;
1617
import reactor.core.publisher.Sinks;
@@ -167,6 +168,13 @@ public Mono<Void> handle(McpSchema.JSONRPCMessage message) {
167168
}
168169
return Mono.empty();
169170
}
171+
else if (message instanceof McpSchema.JSONRPCBatchResponse batchResponse) {
172+
logger.debug("Received Batch Response: {}", batchResponse);
173+
return Flux.fromIterable(batchResponse.responses())
174+
.filter(jsonrpcMessage -> jsonrpcMessage instanceof McpSchema.JSONRPCResponse)
175+
.flatMap(this::handle)
176+
.then();
177+
}
170178
else if (message instanceof McpSchema.JSONRPCRequest request) {
171179
logger.debug("Received request: {}", request);
172180
return handleIncomingRequest(request).onErrorResume(error -> {
@@ -177,6 +185,21 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
177185
return this.transport.sendMessage(errorResponse).then(Mono.empty());
178186
}).flatMap(this.transport::sendMessage);
179187
}
188+
else if (message instanceof McpSchema.JSONRPCBatchRequest batchRequest) {
189+
logger.debug("Received Batch Request: {}", batchRequest);
190+
return Flux.fromIterable(batchRequest.messages()).flatMap(jsonrpcMessage -> {
191+
if (jsonrpcMessage instanceof McpSchema.JSONRPCRequest request) {
192+
return this.handle(request);
193+
}
194+
else if (jsonrpcMessage instanceof McpSchema.JSONRPCNotification notification) {
195+
return this.handle(notification);
196+
}
197+
else {
198+
logger.warn("Unsupported message in batch request: {}", jsonrpcMessage);
199+
return Mono.empty();
200+
}
201+
}).then();
202+
}
180203
else if (message instanceof McpSchema.JSONRPCNotification notification) {
181204
// TODO handle errors for communication to without initialization
182205
// happening first

mcp/src/test/java/io/modelcontextprotocol/MockMcpClientTransport.java

+4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public McpSchema.JSONRPCMessage getLastSentMessage() {
6363
return !sent.isEmpty() ? sent.get(sent.size() - 1) : null;
6464
}
6565

66+
public McpSchema.JSONRPCBatchResponse getSentMessagesAsBatchResponse() {
67+
return new McpSchema.JSONRPCBatchResponse(sent);
68+
}
69+
6670
private volatile boolean connected = false;
6771

6872
@Override

mcp/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java

+38
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
package io.modelcontextprotocol.spec;
66

77
import java.time.Duration;
8+
import java.util.List;
89
import java.util.Map;
10+
import java.util.function.Function;
11+
import java.util.stream.Collectors;
912

1013
import com.fasterxml.jackson.core.type.TypeReference;
1114
import io.modelcontextprotocol.MockMcpClientTransport;
@@ -26,6 +29,7 @@
2629
* request-response correlation, and notification processing.
2730
*
2831
* @author Christian Tzolov
32+
* @author Jihoon Kim
2933
*/
3034
class McpClientSessionTests {
3135

@@ -155,6 +159,40 @@ void testRequestHandling() {
155159
assertThat(response.error()).isNull();
156160
}
157161

162+
@Test
163+
void testBatchRequestHandling() {
164+
String echoMessage1 = "Hello MCP 1!";
165+
String echoMessage2 = "Hello MCP 2!";
166+
167+
// Request handler: echoes the input
168+
Map<String, McpClientSession.RequestHandler<?>> requestHandlers = Map.of(ECHO_METHOD,
169+
params -> Mono.just(params));
170+
transport = new MockMcpClientTransport();
171+
session = new McpClientSession(TIMEOUT, transport, requestHandlers, Map.of());
172+
173+
// Simulate incoming batch request
174+
McpSchema.JSONRPCRequest request1 = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, ECHO_METHOD,
175+
"batch-id-1", echoMessage1);
176+
McpSchema.JSONRPCRequest request2 = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, ECHO_METHOD,
177+
"batch-id-2", echoMessage2);
178+
McpSchema.JSONRPCBatchRequest batchRequest = new McpSchema.JSONRPCBatchRequest(List.of(request1, request2));
179+
transport.simulateIncomingMessage(batchRequest);
180+
181+
// Wait for async processing
182+
McpSchema.JSONRPCBatchResponse batchResponse = transport.getSentMessagesAsBatchResponse();
183+
List<McpSchema.JSONRPCMessage> responses = batchResponse.responses();
184+
185+
assertThat(responses).hasSize(2);
186+
assertThat(responses).allMatch(resp -> resp instanceof McpSchema.JSONRPCResponse);
187+
188+
Map<Object, McpSchema.JSONRPCResponse> responseMap = responses.stream()
189+
.map(resp -> (McpSchema.JSONRPCResponse) resp)
190+
.collect(Collectors.toMap(McpSchema.JSONRPCResponse::id, Function.identity()));
191+
192+
assertThat(responseMap.get("batch-id-1").result()).isEqualTo(echoMessage1);
193+
assertThat(responseMap.get("batch-id-2").result()).isEqualTo(echoMessage2);
194+
}
195+
158196
@Test
159197
void testNotificationHandling() {
160198
Sinks.One<Object> receivedParams = Sinks.one();

0 commit comments

Comments
 (0)