mirror of
https://github.com/anthropics/claude-code-sdk-python.git
synced 2025-12-23 09:19:52 +00:00
Some checks are pending
Lint / lint (push) Waiting to run
Test / test (macos-latest, 3.10) (push) Waiting to run
Test / test (macos-latest, 3.11) (push) Waiting to run
Test / test (macos-latest, 3.12) (push) Waiting to run
Test / test (macos-latest, 3.13) (push) Waiting to run
Test / test (ubuntu-latest, 3.10) (push) Waiting to run
Test / test (ubuntu-latest, 3.11) (push) Waiting to run
Test / test (ubuntu-latest, 3.12) (push) Waiting to run
Test / test (ubuntu-latest, 3.13) (push) Waiting to run
Test / test (windows-latest, 3.10) (push) Waiting to run
Test / test (windows-latest, 3.11) (push) Waiting to run
Test / test (windows-latest, 3.12) (push) Waiting to run
Test / test (windows-latest, 3.13) (push) Waiting to run
Test / test-e2e (macos-latest, 3.10) (push) Blocked by required conditions
Test / test-e2e (macos-latest, 3.11) (push) Blocked by required conditions
Test / test-e2e (macos-latest, 3.12) (push) Blocked by required conditions
Test / test-e2e (macos-latest, 3.13) (push) Blocked by required conditions
Test / test-e2e (ubuntu-latest, 3.10) (push) Blocked by required conditions
Test / test-e2e (ubuntu-latest, 3.11) (push) Blocked by required conditions
Test / test-e2e (ubuntu-latest, 3.12) (push) Blocked by required conditions
Test / test-e2e (ubuntu-latest, 3.13) (push) Blocked by required conditions
Test / test-e2e (windows-latest, 3.10) (push) Blocked by required conditions
Test / test-e2e (windows-latest, 3.11) (push) Blocked by required conditions
Test / test-e2e (windows-latest, 3.12) (push) Blocked by required conditions
Test / test-e2e (windows-latest, 3.13) (push) Blocked by required conditions
Test / test-examples (3.13) (push) Blocked by required conditions
377 lines
16 KiB
Python
377 lines
16 KiB
Python
"""Claude SDK Client for interacting with Claude Code."""
|
|
|
|
import json
|
|
import os
|
|
from collections.abc import AsyncIterable, AsyncIterator
|
|
from dataclasses import replace
|
|
from typing import Any
|
|
|
|
from . import Transport
|
|
from ._errors import CLIConnectionError
|
|
from .types import ClaudeAgentOptions, HookEvent, HookMatcher, Message, ResultMessage
|
|
|
|
|
|
class ClaudeSDKClient:
|
|
"""
|
|
Client for bidirectional, interactive conversations with Claude Code.
|
|
|
|
This client provides full control over the conversation flow with support
|
|
for streaming, interrupts, and dynamic message sending. For simple one-shot
|
|
queries, consider using the query() function instead.
|
|
|
|
Key features:
|
|
- **Bidirectional**: Send and receive messages at any time
|
|
- **Stateful**: Maintains conversation context across messages
|
|
- **Interactive**: Send follow-ups based on responses
|
|
- **Control flow**: Support for interrupts and session management
|
|
|
|
When to use ClaudeSDKClient:
|
|
- Building chat interfaces or conversational UIs
|
|
- Interactive debugging or exploration sessions
|
|
- Multi-turn conversations with context
|
|
- When you need to react to Claude's responses
|
|
- Real-time applications with user input
|
|
- When you need interrupt capabilities
|
|
|
|
When to use query() instead:
|
|
- Simple one-off questions
|
|
- Batch processing of prompts
|
|
- Fire-and-forget automation scripts
|
|
- When all inputs are known upfront
|
|
- Stateless operations
|
|
|
|
See examples/streaming_mode.py for full examples of ClaudeSDKClient in
|
|
different scenarios.
|
|
|
|
Caveat: As of v0.0.20, you cannot use a ClaudeSDKClient instance across
|
|
different async runtime contexts (e.g., different trio nurseries or asyncio
|
|
task groups). The client internally maintains a persistent anyio task group
|
|
for reading messages that remains active from connect() until disconnect().
|
|
This means you must complete all operations with the client within the same
|
|
async context where it was connected. Ideally, this limitation should not
|
|
exist.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
options: ClaudeAgentOptions | None = None,
|
|
transport: Transport | None = None,
|
|
):
|
|
"""Initialize Claude SDK client."""
|
|
if options is None:
|
|
options = ClaudeAgentOptions()
|
|
self.options = options
|
|
self._custom_transport = transport
|
|
self._transport: Transport | None = None
|
|
self._query: Any | None = None
|
|
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py-client"
|
|
|
|
def _convert_hooks_to_internal_format(
|
|
self, hooks: dict[HookEvent, list[HookMatcher]]
|
|
) -> dict[str, list[dict[str, Any]]]:
|
|
"""Convert HookMatcher format to internal Query format."""
|
|
internal_hooks: dict[str, list[dict[str, Any]]] = {}
|
|
for event, matchers in hooks.items():
|
|
internal_hooks[event] = []
|
|
for matcher in matchers:
|
|
# Convert HookMatcher to internal dict format
|
|
internal_matcher: dict[str, Any] = {
|
|
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
|
|
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
|
|
}
|
|
if hasattr(matcher, "timeout") and matcher.timeout is not None:
|
|
internal_matcher["timeout"] = matcher.timeout
|
|
internal_hooks[event].append(internal_matcher)
|
|
return internal_hooks
|
|
|
|
async def connect(
|
|
self, prompt: str | AsyncIterable[dict[str, Any]] | None = None
|
|
) -> None:
|
|
"""Connect to Claude with a prompt or message stream."""
|
|
|
|
from ._internal.query import Query
|
|
from ._internal.transport.subprocess_cli import SubprocessCLITransport
|
|
|
|
# Auto-connect with empty async iterable if no prompt is provided
|
|
async def _empty_stream() -> AsyncIterator[dict[str, Any]]:
|
|
# Never yields, but indicates that this function is an iterator and
|
|
# keeps the connection open.
|
|
# This yield is never reached but makes this an async generator
|
|
return
|
|
yield {} # type: ignore[unreachable]
|
|
|
|
actual_prompt = _empty_stream() if prompt is None else prompt
|
|
|
|
# Validate and configure permission settings (matching TypeScript SDK logic)
|
|
if self.options.can_use_tool:
|
|
# canUseTool callback requires streaming mode (AsyncIterable prompt)
|
|
if isinstance(prompt, str):
|
|
raise ValueError(
|
|
"can_use_tool callback requires streaming mode. "
|
|
"Please provide prompt as an AsyncIterable instead of a string."
|
|
)
|
|
|
|
# canUseTool and permission_prompt_tool_name are mutually exclusive
|
|
if self.options.permission_prompt_tool_name:
|
|
raise ValueError(
|
|
"can_use_tool callback cannot be used with permission_prompt_tool_name. "
|
|
"Please use one or the other."
|
|
)
|
|
|
|
# Automatically set permission_prompt_tool_name to "stdio" for control protocol
|
|
options = replace(self.options, permission_prompt_tool_name="stdio")
|
|
else:
|
|
options = self.options
|
|
|
|
# Use provided custom transport or create subprocess transport
|
|
if self._custom_transport:
|
|
self._transport = self._custom_transport
|
|
else:
|
|
self._transport = SubprocessCLITransport(
|
|
prompt=actual_prompt,
|
|
options=options,
|
|
)
|
|
await self._transport.connect()
|
|
|
|
# Extract SDK MCP servers from options
|
|
sdk_mcp_servers = {}
|
|
if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict):
|
|
for name, config in self.options.mcp_servers.items():
|
|
if isinstance(config, dict) and config.get("type") == "sdk":
|
|
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
|
|
|
|
# Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set
|
|
# CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds
|
|
initialize_timeout_ms = int(
|
|
os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")
|
|
)
|
|
initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0)
|
|
|
|
# Create Query to handle control protocol
|
|
self._query = Query(
|
|
transport=self._transport,
|
|
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
|
|
can_use_tool=self.options.can_use_tool,
|
|
hooks=self._convert_hooks_to_internal_format(self.options.hooks)
|
|
if self.options.hooks
|
|
else None,
|
|
sdk_mcp_servers=sdk_mcp_servers,
|
|
initialize_timeout=initialize_timeout,
|
|
)
|
|
|
|
# Start reading messages and initialize
|
|
await self._query.start()
|
|
await self._query.initialize()
|
|
|
|
# If we have an initial prompt stream, start streaming it
|
|
if prompt is not None and isinstance(prompt, AsyncIterable) and self._query._tg:
|
|
self._query._tg.start_soon(self._query.stream_input, prompt)
|
|
|
|
async def receive_messages(self) -> AsyncIterator[Message]:
|
|
"""Receive all messages from Claude."""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
|
|
from ._internal.message_parser import parse_message
|
|
|
|
async for data in self._query.receive_messages():
|
|
yield parse_message(data)
|
|
|
|
async def query(
|
|
self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default"
|
|
) -> None:
|
|
"""
|
|
Send a new request in streaming mode.
|
|
|
|
Args:
|
|
prompt: Either a string message or an async iterable of message dictionaries
|
|
session_id: Session identifier for the conversation
|
|
"""
|
|
if not self._query or not self._transport:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
|
|
# Handle string prompts
|
|
if isinstance(prompt, str):
|
|
message = {
|
|
"type": "user",
|
|
"message": {"role": "user", "content": prompt},
|
|
"parent_tool_use_id": None,
|
|
"session_id": session_id,
|
|
}
|
|
await self._transport.write(json.dumps(message) + "\n")
|
|
else:
|
|
# Handle AsyncIterable prompts - stream them
|
|
async for msg in prompt:
|
|
# Ensure session_id is set on each message
|
|
if "session_id" not in msg:
|
|
msg["session_id"] = session_id
|
|
await self._transport.write(json.dumps(msg) + "\n")
|
|
|
|
async def interrupt(self) -> None:
|
|
"""Send interrupt signal (only works with streaming mode)."""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
await self._query.interrupt()
|
|
|
|
async def set_permission_mode(self, mode: str) -> None:
|
|
"""Change permission mode during conversation (only works with streaming mode).
|
|
|
|
Args:
|
|
mode: The permission mode to set. Valid options:
|
|
- 'default': CLI prompts for dangerous tools
|
|
- 'acceptEdits': Auto-accept file edits
|
|
- 'bypassPermissions': Allow all tools (use with caution)
|
|
|
|
Example:
|
|
```python
|
|
async with ClaudeSDKClient() as client:
|
|
# Start with default permissions
|
|
await client.query("Help me analyze this codebase")
|
|
|
|
# Review mode done, switch to auto-accept edits
|
|
await client.set_permission_mode('acceptEdits')
|
|
await client.query("Now implement the fix we discussed")
|
|
```
|
|
"""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
await self._query.set_permission_mode(mode)
|
|
|
|
async def set_model(self, model: str | None = None) -> None:
|
|
"""Change the AI model during conversation (only works with streaming mode).
|
|
|
|
Args:
|
|
model: The model to use, or None to use default. Examples:
|
|
- 'claude-sonnet-4-5'
|
|
- 'claude-opus-4-1-20250805'
|
|
- 'claude-opus-4-20250514'
|
|
|
|
Example:
|
|
```python
|
|
async with ClaudeSDKClient() as client:
|
|
# Start with default model
|
|
await client.query("Help me understand this problem")
|
|
|
|
# Switch to a different model for implementation
|
|
await client.set_model('claude-sonnet-4-5')
|
|
await client.query("Now implement the solution")
|
|
```
|
|
"""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
await self._query.set_model(model)
|
|
|
|
async def rewind_files(self, user_message_id: str) -> None:
|
|
"""Rewind tracked files to their state at a specific user message.
|
|
|
|
Requires:
|
|
- `enable_file_checkpointing=True` to track file changes
|
|
- `extra_args={"replay-user-messages": None}` to receive UserMessage
|
|
objects with `uuid` in the response stream
|
|
|
|
Args:
|
|
user_message_id: UUID of the user message to rewind to. This should be
|
|
the `uuid` field from a `UserMessage` received during the conversation.
|
|
|
|
Example:
|
|
```python
|
|
options = ClaudeAgentOptions(
|
|
enable_file_checkpointing=True,
|
|
extra_args={"replay-user-messages": None},
|
|
)
|
|
async with ClaudeSDKClient(options) as client:
|
|
await client.query("Make some changes to my files")
|
|
async for msg in client.receive_response():
|
|
if isinstance(msg, UserMessage) and msg.uuid:
|
|
checkpoint_id = msg.uuid # Save this for later
|
|
|
|
# Later, rewind to that point
|
|
await client.rewind_files(checkpoint_id)
|
|
```
|
|
"""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
await self._query.rewind_files(user_message_id)
|
|
|
|
async def get_server_info(self) -> dict[str, Any] | None:
|
|
"""Get server initialization info including available commands and output styles.
|
|
|
|
Returns initialization information from the Claude Code server including:
|
|
- Available commands (slash commands, system commands, etc.)
|
|
- Current and available output styles
|
|
- Server capabilities
|
|
|
|
Returns:
|
|
Dictionary with server info, or None if not in streaming mode
|
|
|
|
Example:
|
|
```python
|
|
async with ClaudeSDKClient() as client:
|
|
info = await client.get_server_info()
|
|
if info:
|
|
print(f"Commands available: {len(info.get('commands', []))}")
|
|
print(f"Output style: {info.get('output_style', 'default')}")
|
|
```
|
|
"""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
# Return the initialization result that was already obtained during connect
|
|
return getattr(self._query, "_initialization_result", None)
|
|
|
|
async def receive_response(self) -> AsyncIterator[Message]:
|
|
"""
|
|
Receive messages from Claude until and including a ResultMessage.
|
|
|
|
This async iterator yields all messages in sequence and automatically terminates
|
|
after yielding a ResultMessage (which indicates the response is complete).
|
|
It's a convenience method over receive_messages() for single-response workflows.
|
|
|
|
**Stopping Behavior:**
|
|
- Yields each message as it's received
|
|
- Terminates immediately after yielding a ResultMessage
|
|
- The ResultMessage IS included in the yielded messages
|
|
- If no ResultMessage is received, the iterator continues indefinitely
|
|
|
|
Yields:
|
|
Message: Each message received (UserMessage, AssistantMessage, SystemMessage, ResultMessage)
|
|
|
|
Example:
|
|
```python
|
|
async with ClaudeSDKClient() as client:
|
|
await client.query("What's the capital of France?")
|
|
|
|
async for msg in client.receive_response():
|
|
if isinstance(msg, AssistantMessage):
|
|
for block in msg.content:
|
|
if isinstance(block, TextBlock):
|
|
print(f"Claude: {block.text}")
|
|
elif isinstance(msg, ResultMessage):
|
|
print(f"Cost: ${msg.total_cost_usd:.4f}")
|
|
# Iterator will terminate after this message
|
|
```
|
|
|
|
Note:
|
|
To collect all messages: `messages = [msg async for msg in client.receive_response()]`
|
|
The final message in the list will always be a ResultMessage.
|
|
"""
|
|
async for message in self.receive_messages():
|
|
yield message
|
|
if isinstance(message, ResultMessage):
|
|
return
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from Claude."""
|
|
if self._query:
|
|
await self._query.close()
|
|
self._query = None
|
|
self._transport = None
|
|
|
|
async def __aenter__(self) -> "ClaudeSDKClient":
|
|
"""Enter async context - automatically connects with empty stream for interactive use."""
|
|
await self.connect()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
|
|
"""Exit async context - always disconnects."""
|
|
await self.disconnect()
|
|
return False
|