feat: Add in-process SDK MCP server support (#142)

## Summary

Adds in-process SDK MCP server support to the Python SDK, building on
the control protocol from #139.

**Note: Targets `dickson/control` branch (PR #139), not `main`.**

## Key Changes

- Added `@tool` decorator and `create_sdk_mcp_server()` API for defining
in-process MCP servers
- SDK MCP servers run directly in the Python process (no subprocess
overhead)
- Moved SDK MCP handling from Transport to Query class for proper
architectural layering
- Added `McpSdkServerConfig` type and integrated with control protocol

## Example

```python
from claude_code_sdk import tool, create_sdk_mcp_server

@tool("greet", "Greet a user", {"name": str})
async def greet_user(args):
    return {"content": [{"type": "text", "text": f"Hello, {args['name']}!"}]}

server = create_sdk_mcp_server(name="my-tools", tools=[greet_user])

options = ClaudeCodeOptions(mcp_servers={"tools": server})
```

## Testing

- Added integration tests in `test_sdk_mcp_integration.py`
- Added example calculator server in `examples/mcp_calculator.py`

---------

Co-authored-by: Dickson Tsai <dickson@anthropic.com>
Co-authored-by: Ashwin Bhat <ashwin@anthropic.com>
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
kashyap murali 2025-09-03 08:29:32 -07:00 committed by GitHub
parent 22fa9f473e
commit 9ef57859af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 879 additions and 18 deletions

View file

@ -1,5 +1,9 @@
"""Claude SDK for Python."""
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
from ._errors import (
ClaudeSDKError,
CLIConnectionError,
@ -14,6 +18,7 @@ from .types import (
AssistantMessage,
ClaudeCodeOptions,
ContentBlock,
McpSdkServerConfig,
McpServerConfig,
Message,
PermissionMode,
@ -26,6 +31,238 @@ from .types import (
UserMessage,
)
# MCP Server Support
T = TypeVar("T")
@dataclass
class SdkMcpTool(Generic[T]):
"""Definition for an SDK MCP tool."""
name: str
description: str
input_schema: type[T] | dict[str, Any]
handler: Callable[[T], Awaitable[dict[str, Any]]]
def tool(
name: str, description: str, input_schema: type | dict[str, Any]
) -> Callable[[Callable[[Any], Awaitable[dict[str, Any]]]], SdkMcpTool]:
"""Decorator for defining MCP tools with type safety.
Creates a tool that can be used with SDK MCP servers. The tool runs
in-process within your Python application, providing better performance
than external MCP servers.
Args:
name: Unique identifier for the tool. This is what Claude will use
to reference the tool in function calls.
description: Human-readable description of what the tool does.
This helps Claude understand when to use the tool.
input_schema: Schema defining the tool's input parameters.
Can be either:
- A dictionary mapping parameter names to types (e.g., {"text": str})
- A TypedDict class for more complex schemas
- A JSON Schema dictionary for full validation
Returns:
A decorator function that wraps the tool implementation and returns
an SdkMcpTool instance ready for use with create_sdk_mcp_server().
Example:
Basic tool with simple schema:
>>> @tool("greet", "Greet a user", {"name": str})
... async def greet(args):
... return {"content": [{"type": "text", "text": f"Hello, {args['name']}!"}]}
Tool with multiple parameters:
>>> @tool("add", "Add two numbers", {"a": float, "b": float})
... async def add_numbers(args):
... result = args["a"] + args["b"]
... return {"content": [{"type": "text", "text": f"Result: {result}"}]}
Tool with error handling:
>>> @tool("divide", "Divide two numbers", {"a": float, "b": float})
... async def divide(args):
... if args["b"] == 0:
... return {"content": [{"type": "text", "text": "Error: Division by zero"}], "is_error": True}
... return {"content": [{"type": "text", "text": f"Result: {args['a'] / args['b']}"}]}
Notes:
- The tool function must be async (defined with async def)
- The function receives a single dict argument with the input parameters
- The function should return a dict with a "content" key containing the response
- Errors can be indicated by including "is_error": True in the response
"""
def decorator(handler: Callable[[Any], Awaitable[dict[str, Any]]]) -> SdkMcpTool:
return SdkMcpTool(
name=name,
description=description,
input_schema=input_schema,
handler=handler,
)
return decorator
def create_sdk_mcp_server(
name: str, version: str = "1.0.0", tools: list[SdkMcpTool] | None = None
) -> McpSdkServerConfig:
"""Create an in-process MCP server that runs within your Python application.
Unlike external MCP servers that run as separate processes, SDK MCP servers
run directly in your application's process. This provides:
- Better performance (no IPC overhead)
- Simpler deployment (single process)
- Easier debugging (same process)
- Direct access to your application's state
Args:
name: Unique identifier for the server. This name is used to reference
the server in the mcp_servers configuration.
version: Server version string. Defaults to "1.0.0". This is for
informational purposes and doesn't affect functionality.
tools: List of SdkMcpTool instances created with the @tool decorator.
These are the functions that Claude can call through this server.
If None or empty, the server will have no tools (rarely useful).
Returns:
McpSdkServerConfig: A configuration object that can be passed to
ClaudeCodeOptions.mcp_servers. This config contains the server
instance and metadata needed for the SDK to route tool calls.
Example:
Simple calculator server:
>>> @tool("add", "Add numbers", {"a": float, "b": float})
... async def add(args):
... return {"content": [{"type": "text", "text": f"Sum: {args['a'] + args['b']}"}]}
>>>
>>> @tool("multiply", "Multiply numbers", {"a": float, "b": float})
... async def multiply(args):
... return {"content": [{"type": "text", "text": f"Product: {args['a'] * args['b']}"}]}
>>>
>>> calculator = create_sdk_mcp_server(
... name="calculator",
... version="2.0.0",
... tools=[add, multiply]
... )
>>>
>>> # Use with Claude
>>> options = ClaudeCodeOptions(
... mcp_servers={"calc": calculator},
... allowed_tools=["add", "multiply"]
... )
Server with application state access:
>>> class DataStore:
... def __init__(self):
... self.items = []
...
>>> store = DataStore()
>>>
>>> @tool("add_item", "Add item to store", {"item": str})
... async def add_item(args):
... store.items.append(args["item"])
... return {"content": [{"type": "text", "text": f"Added: {args['item']}"}]}
>>>
>>> server = create_sdk_mcp_server("store", tools=[add_item])
Notes:
- The server runs in the same process as your Python application
- Tools have direct access to your application's variables and state
- No subprocess or IPC overhead for tool calls
- Server lifecycle is managed automatically by the SDK
See Also:
- tool(): Decorator for creating tool functions
- ClaudeCodeOptions: Configuration for using servers with query()
"""
from mcp.server import Server
from mcp.types import TextContent, Tool
# Create MCP server instance
server = Server(name, version=version)
# Register tools if provided
if tools:
# Store tools for access in handlers
tool_map = {tool_def.name: tool_def for tool_def in tools}
# Register list_tools handler to expose available tools
@server.list_tools()
async def list_tools() -> list[Tool]:
"""Return the list of available tools."""
tool_list = []
for tool_def in tools:
# Convert input_schema to JSON Schema format
if isinstance(tool_def.input_schema, dict):
# Check if it's already a JSON schema
if (
"type" in tool_def.input_schema
and "properties" in tool_def.input_schema
):
schema = tool_def.input_schema
else:
# Simple dict mapping names to types - convert to JSON schema
properties = {}
for param_name, param_type in tool_def.input_schema.items():
if param_type is str:
properties[param_name] = {"type": "string"}
elif param_type is int:
properties[param_name] = {"type": "integer"}
elif param_type is float:
properties[param_name] = {"type": "number"}
elif param_type is bool:
properties[param_name] = {"type": "boolean"}
else:
properties[param_name] = {"type": "string"} # Default
schema = {
"type": "object",
"properties": properties,
"required": list(properties.keys()),
}
else:
# For TypedDict or other types, create basic schema
schema = {"type": "object", "properties": {}}
tool_list.append(
Tool(
name=tool_def.name,
description=tool_def.description,
inputSchema=schema,
)
)
return tool_list
# Register call_tool handler to execute tools
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Any:
"""Execute a tool by name with given arguments."""
if name not in tool_map:
raise ValueError(f"Tool '{name}' not found")
tool_def = tool_map[name]
# Call the tool's handler with arguments
result = await tool_def.handler(arguments)
# Convert result to MCP format
# The decorator expects us to return the content, not a CallToolResult
# It will wrap our return value in CallToolResult
content = []
if "content" in result:
for item in result["content"]:
if item.get("type") == "text":
content.append(TextContent(type="text", text=item["text"]))
# Return just the content list - the decorator wraps it
return content
# Return SDK server configuration
return McpSdkServerConfig(type="sdk", name=name, instance=server)
__version__ = "0.0.20"
__all__ = [
@ -37,6 +274,7 @@ __all__ = [
# Types
"PermissionMode",
"McpServerConfig",
"McpSdkServerConfig",
"UserMessage",
"AssistantMessage",
"SystemMessage",
@ -48,6 +286,10 @@ __all__ = [
"ToolUseBlock",
"ToolResultBlock",
"ContentBlock",
# MCP Server Support
"create_sdk_mcp_server",
"tool",
"SdkMcpTool",
# Errors
"ClaudeSDKError",
"CLIConnectionError",

View file

@ -36,6 +36,13 @@ class InternalClient:
# Connect transport
await chosen_transport.connect()
# Extract SDK MCP servers from options
sdk_mcp_servers = {}
if options.mcp_servers and isinstance(options.mcp_servers, dict):
for name, config in options.mcp_servers.items():
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"]
# Create Query to handle control protocol
is_streaming = not isinstance(prompt, str)
query = Query(
@ -43,6 +50,7 @@ class InternalClient:
is_streaming_mode=is_streaming,
can_use_tool=None, # TODO: Add support for can_use_tool callback
hooks=None, # TODO: Add support for hooks
sdk_mcp_servers=sdk_mcp_servers,
)
try:

View file

@ -5,9 +5,14 @@ import logging
import os
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable
from contextlib import suppress
from typing import Any
from typing import TYPE_CHECKING, Any
import anyio
from mcp.types import (
CallToolRequest,
CallToolRequestParams,
ListToolsRequest,
)
from ..types import (
SDKControlPermissionRequest,
@ -17,6 +22,9 @@ from ..types import (
)
from .transport import Transport
if TYPE_CHECKING:
from mcp.server import Server as McpServer
logger = logging.getLogger(__name__)
@ -40,6 +48,7 @@ class Query:
]
| None = None,
hooks: dict[str, list[dict[str, Any]]] | None = None,
sdk_mcp_servers: dict[str, "McpServer"] | None = None,
):
"""Initialize Query with transport and callbacks.
@ -48,11 +57,13 @@ class Query:
is_streaming_mode: Whether using streaming (bidirectional) mode
can_use_tool: Optional callback for tool permission requests
hooks: Optional hook configurations
sdk_mcp_servers: Optional SDK MCP server instances
"""
self.transport = transport
self.is_streaming_mode = is_streaming_mode
self.can_use_tool = can_use_tool
self.hooks = hooks or {}
self.sdk_mcp_servers = sdk_mcp_servers or {}
# Control protocol state
self.pending_control_responses: dict[str, anyio.Event] = {}
@ -207,6 +218,16 @@ class Query:
{"signal": None}, # TODO: Add abort signal support
)
elif subtype == "mcp_request":
# Handle SDK MCP request
server_name = request_data.get("server_name")
mcp_message = request_data.get("message")
if not server_name or not mcp_message:
raise Exception("Missing server_name or message for MCP request")
response_data = await self._handle_sdk_mcp_request(server_name, mcp_message)
else:
raise Exception(f"Unsupported control request subtype: {subtype}")
@ -273,6 +294,106 @@ class Query:
self.pending_control_results.pop(request_id, None)
raise Exception(f"Control request timeout: {request.get('subtype')}") from e
async def _handle_sdk_mcp_request(self, server_name: str, message: dict) -> dict:
"""Handle an MCP request for an SDK server.
This acts as a bridge between JSONRPC messages from the CLI
and the in-process MCP server. Ideally the MCP SDK would provide
a method to handle raw JSONRPC, but for now we route manually.
Args:
server_name: Name of the SDK MCP server
message: The JSONRPC message
Returns:
The response message
"""
if server_name not in self.sdk_mcp_servers:
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32601,
"message": f"Server '{server_name}' not found",
},
}
server = self.sdk_mcp_servers[server_name]
method = message.get("method")
params = message.get("params", {})
try:
# TODO: Python MCP SDK lacks the Transport abstraction that TypeScript has.
# TypeScript: server.connect(transport) allows custom transports
# Python: server.run(read_stream, write_stream) requires actual streams
#
# This forces us to manually route methods. When Python MCP adds Transport
# support, we can refactor to match the TypeScript approach.
if method == "tools/list":
request = ListToolsRequest(method=method)
handler = server.request_handlers.get(ListToolsRequest)
if handler:
result = await handler(request)
# Convert MCP result to JSONRPC response
tools_data = [
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema.model_dump() if tool.inputSchema else {}
}
for tool in result.root.tools
]
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {"tools": tools_data}
}
elif method == "tools/call":
request = CallToolRequest(
method=method,
params=CallToolRequestParams(
name=params.get("name"),
arguments=params.get("arguments", {})
)
)
handler = server.request_handlers.get(CallToolRequest)
if handler:
result = await handler(request)
# Convert MCP result to JSONRPC response
content = []
for item in result.root.content:
if hasattr(item, 'text'):
content.append({"type": "text", "text": item.text})
elif hasattr(item, 'data') and hasattr(item, 'mimeType'):
content.append({"type": "image", "data": item.data, "mimeType": item.mimeType})
response_data = {"content": content}
if hasattr(result.root, 'is_error') and result.root.is_error:
response_data["is_error"] = True
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": response_data
}
# Add more methods here as MCP SDK adds them (resources, prompts, etc.)
# This is the limitation Ashwin pointed out - we have to manually update
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {"code": -32601, "message": f"Method '{method}' not found"},
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {"code": -32603, "message": str(e)},
}
async def interrupt(self) -> None:
"""Send interrupt control request."""
await self._send_control_request({"subtype": "interrupt"})

View file

@ -128,13 +128,21 @@ class SubprocessCLITransport(Transport):
if self._options.mcp_servers:
if isinstance(self._options.mcp_servers, dict):
# Dict format: serialize to JSON
cmd.extend(
[
"--mcp-config",
json.dumps({"mcpServers": self._options.mcp_servers}),
]
)
# Filter out SDK servers - they're handled in-process
external_servers = {
name: config
for name, config in self._options.mcp_servers.items()
if not (isinstance(config, dict) and config.get("type") == "sdk")
}
# Only pass external servers to CLI
if external_servers:
cmd.extend(
[
"--mcp-config",
json.dumps({"mcpServers": external_servers}),
]
)
else:
# String or Path format: pass directly as file path or JSON string
cmd.extend(["--mcp-config", str(self._options.mcp_servers)])

View file

@ -124,12 +124,20 @@ class ClaudeSDKClient:
)
await self._transport.connect()
# Extract SDK MCP servers from options
sdk_mcp_servers = {}
if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict):
for name, config in self.options.mcp_servers.items():
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"]
# Create Query to handle control protocol
self._query = Query(
transport=self._transport,
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
can_use_tool=None, # TODO: Add support for can_use_tool callback
hooks=None, # TODO: Add support for hooks
sdk_mcp_servers=sdk_mcp_servers,
)
# Start reading messages and initialize

View file

@ -2,10 +2,13 @@
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal, TypedDict
from typing import TYPE_CHECKING, Any, Literal, TypedDict
from typing_extensions import NotRequired # For Python < 3.11 compatibility
if TYPE_CHECKING:
from mcp.server import Server as McpServer
# Permission modes
PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"]
@ -36,7 +39,17 @@ class McpHttpServerConfig(TypedDict):
headers: NotRequired[dict[str, str]]
McpServerConfig = McpStdioServerConfig | McpSSEServerConfig | McpHttpServerConfig
class McpSdkServerConfig(TypedDict):
"""SDK MCP server configuration."""
type: Literal["sdk"]
name: str
instance: "McpServer"
McpServerConfig = (
McpStdioServerConfig | McpSSEServerConfig | McpHttpServerConfig | McpSdkServerConfig
)
# Content block types