mirror of
https://github.com/anthropics/claude-code-sdk-python.git
synced 2025-12-23 09:19:52 +00:00
Add DeprecationWarning to both ClaudeSDKClient and query() entry points to inform users that the package has been renamed to claude-agent-sdk and provide migration guidance. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude <noreply@anthropic.com>
286 lines
12 KiB
Python
286 lines
12 KiB
Python
"""Claude SDK Client for interacting with Claude Code."""
|
|
|
|
import json
|
|
import os
|
|
import warnings
|
|
from collections.abc import AsyncIterable, AsyncIterator
|
|
from dataclasses import replace
|
|
from typing import Any
|
|
|
|
from ._errors import CLIConnectionError
|
|
from .types import ClaudeCodeOptions, HookEvent, HookMatcher, Message, ResultMessage
|
|
|
|
|
|
class ClaudeSDKClient:
|
|
"""
|
|
Client for bidirectional, interactive conversations with Claude Code.
|
|
|
|
This client provides full control over the conversation flow with support
|
|
for streaming, interrupts, and dynamic message sending. For simple one-shot
|
|
queries, consider using the query() function instead.
|
|
|
|
Key features:
|
|
- **Bidirectional**: Send and receive messages at any time
|
|
- **Stateful**: Maintains conversation context across messages
|
|
- **Interactive**: Send follow-ups based on responses
|
|
- **Control flow**: Support for interrupts and session management
|
|
|
|
When to use ClaudeSDKClient:
|
|
- Building chat interfaces or conversational UIs
|
|
- Interactive debugging or exploration sessions
|
|
- Multi-turn conversations with context
|
|
- When you need to react to Claude's responses
|
|
- Real-time applications with user input
|
|
- When you need interrupt capabilities
|
|
|
|
When to use query() instead:
|
|
- Simple one-off questions
|
|
- Batch processing of prompts
|
|
- Fire-and-forget automation scripts
|
|
- When all inputs are known upfront
|
|
- Stateless operations
|
|
|
|
See examples/streaming_mode.py for full examples of ClaudeSDKClient in
|
|
different scenarios.
|
|
|
|
Caveat: As of v0.0.20, you cannot use a ClaudeSDKClient instance across
|
|
different async runtime contexts (e.g., different trio nurseries or asyncio
|
|
task groups). The client internally maintains a persistent anyio task group
|
|
for reading messages that remains active from connect() until disconnect().
|
|
This means you must complete all operations with the client within the same
|
|
async context where it was connected. Ideally, this limitation should not
|
|
exist.
|
|
"""
|
|
|
|
def __init__(self, options: ClaudeCodeOptions | None = None):
|
|
"""Initialize Claude SDK client."""
|
|
warnings.warn(
|
|
"The Claude Code SDK is now the Claude Agent SDK! "
|
|
"Please install and use claude-agent-sdk instead. "
|
|
"See https://docs.claude.com/en/docs/claude-code/sdk/migration-guide for migration instructions.",
|
|
DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
if options is None:
|
|
options = ClaudeCodeOptions()
|
|
self.options = options
|
|
self._transport: Any | None = None
|
|
self._query: Any | None = None
|
|
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py-client"
|
|
|
|
def _convert_hooks_to_internal_format(
|
|
self, hooks: dict[HookEvent, list[HookMatcher]]
|
|
) -> dict[str, list[dict[str, Any]]]:
|
|
"""Convert HookMatcher format to internal Query format."""
|
|
internal_hooks: dict[str, list[dict[str, Any]]] = {}
|
|
for event, matchers in hooks.items():
|
|
internal_hooks[event] = []
|
|
for matcher in matchers:
|
|
# Convert HookMatcher to internal dict format
|
|
internal_matcher = {
|
|
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
|
|
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
|
|
}
|
|
internal_hooks[event].append(internal_matcher)
|
|
return internal_hooks
|
|
|
|
async def connect(
|
|
self, prompt: str | AsyncIterable[dict[str, Any]] | None = None
|
|
) -> None:
|
|
"""Connect to Claude with a prompt or message stream."""
|
|
|
|
from ._internal.query import Query
|
|
from ._internal.transport.subprocess_cli import SubprocessCLITransport
|
|
|
|
# Auto-connect with empty async iterable if no prompt is provided
|
|
async def _empty_stream() -> AsyncIterator[dict[str, Any]]:
|
|
# Never yields, but indicates that this function is an iterator and
|
|
# keeps the connection open.
|
|
# This yield is never reached but makes this an async generator
|
|
return
|
|
yield {} # type: ignore[unreachable]
|
|
|
|
actual_prompt = _empty_stream() if prompt is None else prompt
|
|
|
|
# Validate and configure permission settings (matching TypeScript SDK logic)
|
|
if self.options.can_use_tool:
|
|
# canUseTool callback requires streaming mode (AsyncIterable prompt)
|
|
if isinstance(prompt, str):
|
|
raise ValueError(
|
|
"can_use_tool callback requires streaming mode. "
|
|
"Please provide prompt as an AsyncIterable instead of a string."
|
|
)
|
|
|
|
# canUseTool and permission_prompt_tool_name are mutually exclusive
|
|
if self.options.permission_prompt_tool_name:
|
|
raise ValueError(
|
|
"can_use_tool callback cannot be used with permission_prompt_tool_name. "
|
|
"Please use one or the other."
|
|
)
|
|
|
|
# Automatically set permission_prompt_tool_name to "stdio" for control protocol
|
|
options = replace(self.options, permission_prompt_tool_name="stdio")
|
|
else:
|
|
options = self.options
|
|
|
|
self._transport = SubprocessCLITransport(
|
|
prompt=actual_prompt,
|
|
options=options,
|
|
)
|
|
await self._transport.connect()
|
|
|
|
# Extract SDK MCP servers from options
|
|
sdk_mcp_servers = {}
|
|
if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict):
|
|
for name, config in self.options.mcp_servers.items():
|
|
if isinstance(config, dict) and config.get("type") == "sdk":
|
|
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
|
|
|
|
# Create Query to handle control protocol
|
|
self._query = Query(
|
|
transport=self._transport,
|
|
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
|
|
can_use_tool=self.options.can_use_tool,
|
|
hooks=self._convert_hooks_to_internal_format(self.options.hooks)
|
|
if self.options.hooks
|
|
else None,
|
|
sdk_mcp_servers=sdk_mcp_servers,
|
|
)
|
|
|
|
# Start reading messages and initialize
|
|
await self._query.start()
|
|
await self._query.initialize()
|
|
|
|
# If we have an initial prompt stream, start streaming it
|
|
if prompt is not None and isinstance(prompt, AsyncIterable) and self._query._tg:
|
|
self._query._tg.start_soon(self._query.stream_input, prompt)
|
|
|
|
async def receive_messages(self) -> AsyncIterator[Message]:
|
|
"""Receive all messages from Claude."""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
|
|
from ._internal.message_parser import parse_message
|
|
|
|
async for data in self._query.receive_messages():
|
|
yield parse_message(data)
|
|
|
|
async def query(
|
|
self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default"
|
|
) -> None:
|
|
"""
|
|
Send a new request in streaming mode.
|
|
|
|
Args:
|
|
prompt: Either a string message or an async iterable of message dictionaries
|
|
session_id: Session identifier for the conversation
|
|
"""
|
|
if not self._query or not self._transport:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
|
|
# Handle string prompts
|
|
if isinstance(prompt, str):
|
|
message = {
|
|
"type": "user",
|
|
"message": {"role": "user", "content": prompt},
|
|
"parent_tool_use_id": None,
|
|
"session_id": session_id,
|
|
}
|
|
await self._transport.write(json.dumps(message) + "\n")
|
|
else:
|
|
# Handle AsyncIterable prompts - stream them
|
|
async for msg in prompt:
|
|
# Ensure session_id is set on each message
|
|
if "session_id" not in msg:
|
|
msg["session_id"] = session_id
|
|
await self._transport.write(json.dumps(msg) + "\n")
|
|
|
|
async def interrupt(self) -> None:
|
|
"""Send interrupt signal (only works with streaming mode)."""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
await self._query.interrupt()
|
|
|
|
async def get_server_info(self) -> dict[str, Any] | None:
|
|
"""Get server initialization info including available commands and output styles.
|
|
|
|
Returns initialization information from the Claude Code server including:
|
|
- Available commands (slash commands, system commands, etc.)
|
|
- Current and available output styles
|
|
- Server capabilities
|
|
|
|
Returns:
|
|
Dictionary with server info, or None if not in streaming mode
|
|
|
|
Example:
|
|
```python
|
|
async with ClaudeSDKClient() as client:
|
|
info = await client.get_server_info()
|
|
if info:
|
|
print(f"Commands available: {len(info.get('commands', []))}")
|
|
print(f"Output style: {info.get('output_style', 'default')}")
|
|
```
|
|
"""
|
|
if not self._query:
|
|
raise CLIConnectionError("Not connected. Call connect() first.")
|
|
# Return the initialization result that was already obtained during connect
|
|
return getattr(self._query, "_initialization_result", None)
|
|
|
|
async def receive_response(self) -> AsyncIterator[Message]:
|
|
"""
|
|
Receive messages from Claude until and including a ResultMessage.
|
|
|
|
This async iterator yields all messages in sequence and automatically terminates
|
|
after yielding a ResultMessage (which indicates the response is complete).
|
|
It's a convenience method over receive_messages() for single-response workflows.
|
|
|
|
**Stopping Behavior:**
|
|
- Yields each message as it's received
|
|
- Terminates immediately after yielding a ResultMessage
|
|
- The ResultMessage IS included in the yielded messages
|
|
- If no ResultMessage is received, the iterator continues indefinitely
|
|
|
|
Yields:
|
|
Message: Each message received (UserMessage, AssistantMessage, SystemMessage, ResultMessage)
|
|
|
|
Example:
|
|
```python
|
|
async with ClaudeSDKClient() as client:
|
|
await client.query("What's the capital of France?")
|
|
|
|
async for msg in client.receive_response():
|
|
if isinstance(msg, AssistantMessage):
|
|
for block in msg.content:
|
|
if isinstance(block, TextBlock):
|
|
print(f"Claude: {block.text}")
|
|
elif isinstance(msg, ResultMessage):
|
|
print(f"Cost: ${msg.total_cost_usd:.4f}")
|
|
# Iterator will terminate after this message
|
|
```
|
|
|
|
Note:
|
|
To collect all messages: `messages = [msg async for msg in client.receive_response()]`
|
|
The final message in the list will always be a ResultMessage.
|
|
"""
|
|
async for message in self.receive_messages():
|
|
yield message
|
|
if isinstance(message, ResultMessage):
|
|
return
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from Claude."""
|
|
if self._query:
|
|
await self._query.close()
|
|
self._query = None
|
|
self._transport = None
|
|
|
|
async def __aenter__(self) -> "ClaudeSDKClient":
|
|
"""Enter async context - automatically connects with empty stream for interactive use."""
|
|
await self.connect()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
|
|
"""Exit async context - always disconnects."""
|
|
await self.disconnect()
|
|
return False
|