Skip to content

Commit 02c141b

Browse files
authored
Feature: Refactor MCP Server Architecture to Streamable or Stateless Single Service (#3096)
1 parent f1f96b1 commit 02c141b

File tree

18 files changed

+214
-230
lines changed

18 files changed

+214
-230
lines changed

core/src/main/java/arthas.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ arthas.localConnectionNonAuth=true
2525

2626
# MCP (Model Context Protocol) configuration
2727
arthas.mcpEndpoint=/mcp
28+
arthas.mcpProtocol=STATELESS

core/src/main/java/com/taobao/arthas/core/config/Configure.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ public class Configure {
7979
*/
8080
private String mcpEndpoint;
8181

82+
/**
83+
* MCP Server Protocol: STREAMABLE or STATELESS
84+
*/
85+
private String mcpProtocol;
86+
8287
public String getIp() {
8388
return ip;
8489
}
@@ -223,6 +228,14 @@ public void setMcpEndpoint(String mcpEndpoint) {
223228
this.mcpEndpoint = mcpEndpoint;
224229
}
225230

231+
public String getMcpProtocol() {
232+
return mcpProtocol;
233+
}
234+
235+
public void setMcpProtocol(String mcpProtocol) {
236+
this.mcpProtocol = mcpProtocol;
237+
}
238+
226239
/**
227240
* 序列化成字符串
228241
*

core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,14 +470,15 @@ private void bind(Configure configure) throws Throwable {
470470

471471
// Mcp Server
472472
String mcpEndpoint = configure.getMcpEndpoint();
473+
String mcpProtocol = configure.getMcpProtocol();
473474
if (mcpEndpoint != null && !mcpEndpoint.trim().isEmpty()) {
474-
logger().info("try to start mcp server, endpoint: {}.", mcpEndpoint);
475+
logger().info("try to start mcp server, endpoint: {}, protocol: {}.", mcpEndpoint, mcpProtocol);
475476
CommandExecutor commandExecutor = new CommandExecutorImpl(sessionManager);
476-
ArthasMcpBootstrap arthasMcpBootstrap = new ArthasMcpBootstrap(commandExecutor, mcpEndpoint);
477+
ArthasMcpBootstrap arthasMcpBootstrap = new ArthasMcpBootstrap(commandExecutor, mcpEndpoint, mcpProtocol);
477478
this.mcpRequestHandler = arthasMcpBootstrap.start().getMcpRequestHandler();
478479
}
479-
logger().info("as-server listening on network={};telnet={};http={};timeout={};mcp={};", configure.getIp(),
480-
configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout(), configure.getMcpEndpoint());
480+
logger().info("as-server listening on network={};telnet={};http={};timeout={};mcp={};mcpProtocol={};", configure.getIp(),
481+
configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout(), configure.getMcpEndpoint(), configure.getMcpProtocol());
481482

482483
// 异步回报启动次数
483484
if (configure.getStatUrl() != null) {

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/ArthasMcpBootstrap.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ public class ArthasMcpBootstrap {
1414
private ArthasMcpServer mcpServer;
1515
private final CommandExecutor commandExecutor;
1616
private final String mcpEndpoint;
17+
private final String protocol;
1718
private static ArthasMcpBootstrap instance;
1819

19-
public ArthasMcpBootstrap(CommandExecutor commandExecutor, String mcpEndpoint) {
20+
public ArthasMcpBootstrap(CommandExecutor commandExecutor, String mcpEndpoint, String protocol) {
2021
this.commandExecutor = commandExecutor;
2122
this.mcpEndpoint = mcpEndpoint;
23+
this.protocol = protocol;
2224
instance = this;
2325
}
2426

@@ -37,7 +39,7 @@ public ArthasMcpServer start() {
3739
commandExecutor.getClass().getSimpleName());
3840

3941
// Create and start MCP server with CommandExecutor and custom endpoint
40-
mcpServer = new ArthasMcpServer(mcpEndpoint, commandExecutor);
42+
mcpServer = new ArthasMcpServer(mcpEndpoint, commandExecutor, protocol);
4143
logger.debug("MCP server instance created successfully");
4244

4345
mcpServer.start();

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/ArthasMcpServer.java

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import com.taobao.arthas.mcp.server.protocol.config.McpServerProperties;
5+
import com.taobao.arthas.mcp.server.protocol.config.McpServerProperties.ServerProtocol;
56
import com.taobao.arthas.mcp.server.protocol.server.McpNettyServer;
67
import com.taobao.arthas.mcp.server.protocol.server.McpServer;
78
import com.taobao.arthas.mcp.server.protocol.server.McpStatelessNettyServer;
@@ -35,6 +36,7 @@ public class ArthasMcpServer {
3536
private McpStatelessNettyServer statelessServer;
3637

3738
private final String mcpEndpoint;
39+
private final ServerProtocol protocol;
3840

3941
private final CommandExecutor commandExecutor;
4042

@@ -46,9 +48,19 @@ public class ArthasMcpServer {
4648

4749
public static final String DEFAULT_MCP_ENDPOINT = "/mcp";
4850

49-
public ArthasMcpServer(String mcpEndpoint, CommandExecutor commandExecutor) {
51+
public ArthasMcpServer(String mcpEndpoint, CommandExecutor commandExecutor, String protocol) {
5052
this.mcpEndpoint = mcpEndpoint != null ? mcpEndpoint : DEFAULT_MCP_ENDPOINT;
5153
this.commandExecutor = commandExecutor;
54+
55+
ServerProtocol resolvedProtocol = ServerProtocol.STATELESS;
56+
if (protocol != null && !protocol.trim().isEmpty()) {
57+
try {
58+
resolvedProtocol = ServerProtocol.valueOf(protocol.toUpperCase());
59+
} catch (IllegalArgumentException e) {
60+
logger.warn("Invalid MCP protocol: {}. Using default: STATELESS", protocol);
61+
}
62+
}
63+
this.protocol = resolvedProtocol;
5264
}
5365

5466
public McpHttpRequestHandler getMcpRequestHandler() {
@@ -68,6 +80,7 @@ public void start() {
6880
.resourceChangeNotification(true)
6981
.promptChangeNotification(true)
7082
.objectMapper(JsonParser.getObjectMapper())
83+
.protocol(this.protocol)
7184
.build();
7285

7386
ToolCallbackProvider toolCallbackProvider = new DefaultToolCallbackProvider();
@@ -76,50 +89,51 @@ public void start() {
7689
.filter(Objects::nonNull)
7790
.collect(Collectors.toList());
7891

79-
// Create transport for both streamable and stateless servers
80-
McpStreamableServerTransportProvider transportProvider = createStreamableHttpTransportProvider(properties);
81-
streamableHandler = transportProvider.getMcpRequestHandler();
82-
83-
NettyStatelessServerTransport statelessTransport = createStatelessHttpTransport(properties);
84-
statelessHandler = statelessTransport.getMcpRequestHandler();
85-
8692
unifiedMcpHandler = McpHttpRequestHandler.builder()
8793
.mcpEndpoint(properties.getMcpEndpoint())
8894
.objectMapper(properties.getObjectMapper())
89-
.tools(Arrays.asList(callbacks))
95+
.protocol(properties.getProtocol())
9096
.build();
91-
unifiedMcpHandler.setStreamableHandler(streamableHandler);
92-
unifiedMcpHandler.setStatelessHandler(statelessHandler);
93-
94-
// Set up unified MCP handler for both streamable and stateless servers
95-
McpServer.StreamableServerNettySpecification streamableServerNettySpecification = McpServer.netty(transportProvider)
96-
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
97-
.capabilities(buildServerCapabilities(properties))
98-
.instructions(properties.getInstructions())
99-
.requestTimeout(properties.getRequestTimeout())
100-
.commandExecutor(commandExecutor)
101-
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : JsonParser.getObjectMapper());
102-
103-
// Set up unified MCP handler for both streamable and stateless servers
104-
McpServer.StatelessServerNettySpecification statelessServerNettySpecification = McpServer.netty(statelessTransport)
105-
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
106-
.capabilities(buildServerCapabilities(properties))
107-
.instructions(properties.getInstructions())
108-
.requestTimeout(properties.getRequestTimeout())
109-
.commandExecutor(commandExecutor)
110-
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : JsonParser.getObjectMapper());
111-
112-
streamableServerNettySpecification.tools(
113-
McpToolUtils.toStreamableToolSpecifications(providerToolCallbacks));
114-
statelessServerNettySpecification.tools(
115-
McpToolUtils.toStatelessToolSpecifications(providerToolCallbacks));
116-
117-
streamableServer = streamableServerNettySpecification.build();
118-
statelessServer = statelessServerNettySpecification.build();
119-
97+
98+
if (properties.getProtocol() == ServerProtocol.STREAMABLE) {
99+
McpStreamableServerTransportProvider transportProvider = createStreamableHttpTransportProvider(properties);
100+
streamableHandler = transportProvider.getMcpRequestHandler();
101+
unifiedMcpHandler.setStreamableHandler(streamableHandler);
102+
103+
McpServer.StreamableServerNettySpecification streamableServerNettySpecification = McpServer.netty(transportProvider)
104+
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
105+
.capabilities(buildServerCapabilities(properties))
106+
.instructions(properties.getInstructions())
107+
.requestTimeout(properties.getRequestTimeout())
108+
.commandExecutor(commandExecutor)
109+
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : JsonParser.getObjectMapper());
110+
111+
streamableServerNettySpecification.tools(
112+
McpToolUtils.toStreamableToolSpecifications(providerToolCallbacks));
113+
114+
streamableServer = streamableServerNettySpecification.build();
115+
} else {
116+
NettyStatelessServerTransport statelessTransport = createStatelessHttpTransport(properties);
117+
statelessHandler = statelessTransport.getMcpRequestHandler();
118+
unifiedMcpHandler.setStatelessHandler(statelessHandler);
119+
120+
McpServer.StatelessServerNettySpecification statelessServerNettySpecification = McpServer.netty(statelessTransport)
121+
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
122+
.capabilities(buildServerCapabilities(properties))
123+
.instructions(properties.getInstructions())
124+
.requestTimeout(properties.getRequestTimeout())
125+
.commandExecutor(commandExecutor)
126+
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : JsonParser.getObjectMapper());
127+
128+
statelessServerNettySpecification.tools(
129+
McpToolUtils.toStatelessToolSpecifications(providerToolCallbacks));
130+
131+
statelessServer = statelessServerNettySpecification.build();
132+
}
133+
120134
logger.info("Arthas MCP server started successfully");
121135
logger.info("- MCP Endpoint: {}", properties.getMcpEndpoint());
122-
logger.info("- Transport modes: Streamable + Stateless");
136+
logger.info("- Transport mode: {}", properties.getProtocol());
123137
logger.info("- Available tools: {}", providerToolCallbacks.size());
124138
logger.info("- Server ready to accept connections");
125139
} catch (Exception e) {

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/protocol/config/McpServerProperties.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class McpServerProperties {
3939

4040
private final ObjectMapper objectMapper;
4141

42+
private final ServerProtocol protocol;
43+
4244
/**
4345
* (Optional) response MIME type per tool name.
4446
*/
@@ -59,6 +61,7 @@ private McpServerProperties(Builder builder) {
5961
this.requestTimeout = builder.requestTimeout;
6062
this.initializationTimeout = builder.initializationTimeout;
6163
this.objectMapper = builder.objectMapper;
64+
this.protocol = builder.protocol;
6265
}
6366

6467
/**
@@ -68,6 +71,10 @@ public static Builder builder() {
6871
return new Builder();
6972
}
7073

74+
public enum ServerProtocol {
75+
STREAMABLE, STATELESS
76+
}
77+
7178
/**
7279
* Get server name
7380
* @return Server name
@@ -156,6 +163,10 @@ public ObjectMapper getObjectMapper() {
156163
return objectMapper;
157164
}
158165

166+
public ServerProtocol getProtocol() {
167+
return protocol;
168+
}
169+
159170
public Map<String, String> getToolResponseMimeType() {
160171
return toolResponseMimeType;
161172
}
@@ -182,6 +193,7 @@ public static class Builder {
182193
private Duration requestTimeout = Duration.ofSeconds(10);
183194
private Duration initializationTimeout = Duration.ofSeconds(30);
184195
private ObjectMapper objectMapper;
196+
private ServerProtocol protocol = ServerProtocol.STATELESS;
185197

186198
public Builder() {
187199
// Private constructor to prevent direct instantiation
@@ -242,6 +254,11 @@ public Builder objectMapper(ObjectMapper objectMapper) {
242254
return this;
243255
}
244256

257+
public Builder protocol(ServerProtocol protocol) {
258+
this.protocol = protocol;
259+
return this;
260+
}
261+
245262
/**
246263
* Build McpServerProperties instance
247264
*/

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/protocol/server/DefaultMcpStatelessServerHandler.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
import com.taobao.arthas.mcp.server.protocol.spec.McpError;
99
import com.taobao.arthas.mcp.server.protocol.spec.McpSchema;
1010
import com.taobao.arthas.mcp.server.session.ArthasCommandContext;
11+
import com.taobao.arthas.mcp.server.session.ArthasCommandSessionManager;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
1314

1415
import java.util.Map;
16+
import java.util.UUID;
1517
import java.util.concurrent.CompletableFuture;
1618
import java.util.concurrent.CompletionException;
1719

@@ -25,19 +27,28 @@ class DefaultMcpStatelessServerHandler implements McpStatelessServerHandler {
2527

2628
private final CommandExecutor commandExecutor;
2729

30+
private final ArthasCommandSessionManager commandSessionManager;
31+
2832
public DefaultMcpStatelessServerHandler(Map<String, McpStatelessRequestHandler<?>> requestHandlers,
2933
Map<String, McpStatelessNotificationHandler> notificationHandlers,
3034
CommandExecutor commandExecutor) {
3135
this.requestHandlers = requestHandlers;
3236
this.notificationHandlers = notificationHandlers;
3337
this.commandExecutor = commandExecutor;
38+
this.commandSessionManager = new ArthasCommandSessionManager(commandExecutor);
3439
}
3540

3641
@Override
3742
public CompletableFuture<McpSchema.JSONRPCResponse> handleRequest(McpTransportContext ctx, McpSchema.JSONRPCRequest req) {
38-
ArthasCommandContext commandContext = createCommandContext();
43+
// Create a temporary session for this request
44+
String tempSessionId = UUID.randomUUID().toString();
45+
ArthasCommandSessionManager.CommandSessionBinding binding = commandSessionManager.createCommandSession(tempSessionId);
46+
ArthasCommandContext commandContext = new ArthasCommandContext(commandExecutor, binding);
47+
3948
McpStatelessRequestHandler<?> handler = requestHandlers.get(req.getMethod());
4049
if (handler == null) {
50+
// Clean up session if handler not found
51+
closeSession(binding);
4152
CompletableFuture<McpSchema.JSONRPCResponse> f = new CompletableFuture<>();
4253
f.completeExceptionally(new McpError("Missing handler for request type: " + req.getMethod()));
4354
return f;
@@ -47,6 +58,9 @@ public CompletableFuture<McpSchema.JSONRPCResponse> handleRequest(McpTransportCo
4758
CompletableFuture<Object> result = (CompletableFuture<Object>) handler
4859
.handle(ctx, commandContext, req.getParams());
4960
return result.handle((r, ex) -> {
61+
// Clean up session after execution
62+
closeSession(binding);
63+
5064
if (ex != null) {
5165
Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
5266
return new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, req.getId(), null,
@@ -55,12 +69,23 @@ public CompletableFuture<McpSchema.JSONRPCResponse> handleRequest(McpTransportCo
5569
return new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, req.getId(), r, null);
5670
});
5771
} catch (Throwable t) {
72+
// Clean up session on error
73+
closeSession(binding);
74+
5875
CompletableFuture<McpSchema.JSONRPCResponse> f = new CompletableFuture<>();
5976
f.completeExceptionally(t);
6077
return f;
6178
}
6279
}
6380

81+
private void closeSession(ArthasCommandSessionManager.CommandSessionBinding binding) {
82+
try {
83+
commandExecutor.closeSession(binding.getArthasSessionId());
84+
} catch (Exception e) {
85+
logger.warn("Failed to close temporary session: {}", binding.getArthasSessionId(), e);
86+
}
87+
}
88+
6489
@Override
6590
public CompletableFuture<Void> handleNotification(McpTransportContext ctx,
6691
McpSchema.JSONRPCNotification note) {
@@ -78,8 +103,4 @@ public CompletableFuture<Void> handleNotification(McpTransportContext ctx,
78103
}
79104
}
80105

81-
private ArthasCommandContext createCommandContext() {
82-
return new ArthasCommandContext(commandExecutor);
83-
}
84-
85106
}

0 commit comments

Comments
 (0)