Implement proper client and bidi streaming

This commit is contained in:
Dickson Tsai 2025-07-19 10:43:23 -07:00
parent f4cff21590
commit 6dd12b0df8
No known key found for this signature in database
7 changed files with 627 additions and 146 deletions

View file

@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""Example demonstrating streaming mode with bidirectional communication."""
import asyncio
from collections.abc import AsyncIterator
from claude_code_sdk import ClaudeCodeOptions, ClaudeSDKClient, query
async def create_message_stream() -> AsyncIterator[dict]:
"""Create an async stream of user messages."""
# Example messages to send
messages = [
{
"type": "user",
"message": {
"role": "user",
"content": "Hello! Please tell me a bit about Python async programming.",
},
"parent_tool_use_id": None,
"session_id": "example-session-1",
},
# Add a delay to simulate interactive conversation
None, # We'll use this as a signal to delay
{
"type": "user",
"message": {
"role": "user",
"content": "Can you give me a simple code example?",
},
"parent_tool_use_id": None,
"session_id": "example-session-1",
},
]
for msg in messages:
if msg is None:
await asyncio.sleep(2) # Simulate user thinking time
continue
yield msg
async def example_string_mode():
"""Example using traditional string mode (backward compatible)."""
print("=== String Mode Example ===")
# Option 1: Using query function
async for message in query(
prompt="What is 2+2? Please give a brief answer.", options=ClaudeCodeOptions()
):
print(f"Received: {type(message).__name__}")
if hasattr(message, "content"):
print(f" Content: {message.content}")
print("Completed\n")
async def example_streaming_mode():
"""Example using new streaming mode with async iterable."""
print("=== Streaming Mode Example ===")
options = ClaudeCodeOptions()
# Create message stream
message_stream = create_message_stream()
# Use query with async iterable
message_count = 0
async for message in query(prompt=message_stream, options=options):
message_count += 1
msg_type = type(message).__name__
print(f"\nMessage #{message_count} ({msg_type}):")
if hasattr(message, "content"):
content = message.content
if isinstance(content, list):
for block in content:
if hasattr(block, "text"):
print(f" {block.text}")
else:
print(f" {content}")
elif hasattr(message, "subtype"):
print(f" Subtype: {message.subtype}")
print("\nCompleted")
async def example_with_context_manager():
"""Example using context manager for cleaner code."""
print("=== Context Manager Example ===")
# Simple one-shot query with automatic cleanup
async with ClaudeSDKClient() as client:
await client.send_message("What is the meaning of life?")
async for message in client.receive_messages():
if hasattr(message, "content"):
print(f"Response: {message.content}")
print("\nCompleted with automatic cleanup\n")
async def example_with_interrupt():
"""Example demonstrating interrupt functionality."""
print("=== Streaming Mode with Interrupt Example ===")
options = ClaudeCodeOptions()
client = ClaudeSDKClient(options=options)
async def interruptible_stream():
"""Stream that we'll interrupt."""
yield {
"type": "user",
"message": {
"role": "user",
"content": "Count to 1000 slowly, saying each number.",
},
"parent_tool_use_id": None,
"session_id": "interrupt-example",
}
# Keep the stream open by waiting indefinitely
# This prevents stdin from being closed
await asyncio.Event().wait()
try:
await client.connect(interruptible_stream())
print("Connected - will interrupt after 3 seconds")
# Create tasks for receiving and interrupting
async def receive_and_interrupt():
# Start a background task to continuously receive messages
async def receive_messages():
async for message in client.receive_messages():
msg_type = type(message).__name__
print(f"Received: {msg_type}")
if hasattr(message, "content") and isinstance(
message.content, list
):
for block in message.content:
if hasattr(block, "text"):
print(f" {block.text[:50]}...") # First 50 chars
# Start receiving in background
receive_task = asyncio.create_task(receive_messages())
# Wait 3 seconds then interrupt
await asyncio.sleep(3)
print("\nSending interrupt signal...")
try:
await client.interrupt()
print("Interrupt sent successfully")
except Exception as e:
print(f"Interrupt error: {e}")
# Give some time to see any final messages
await asyncio.sleep(2)
# Cancel the receive task
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
await receive_and_interrupt()
except Exception as e:
print(f"Error: {e}")
finally:
await client.disconnect()
print("\nDisconnected")
async def main():
"""Run all examples."""
# Run string mode example
await example_string_mode()
# Run streaming mode example
await example_streaming_mode()
# Run context manager example
await example_with_context_manager()
# Run interrupt example
await example_with_interrupt()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,7 +1,5 @@
"""Claude SDK for Python."""
import os
from collections.abc import AsyncIterator
from ._errors import (
ClaudeSDKError,
@ -10,7 +8,8 @@ from ._errors import (
CLINotFoundError,
ProcessError,
)
from ._internal.client import InternalClient
from .client import ClaudeSDKClient
from .query import query
from .types import (
AssistantMessage,
ClaudeCodeOptions,
@ -29,8 +28,9 @@ from .types import (
__version__ = "0.0.14"
__all__ = [
# Main function
# Main exports
"query",
"ClaudeSDKClient",
# Types
"PermissionMode",
"McpServerConfig",
@ -51,52 +51,3 @@ __all__ = [
"ProcessError",
"CLIJSONDecodeError",
]
async def query(
*, prompt: str, options: ClaudeCodeOptions | None = None
) -> AsyncIterator[Message]:
"""
Query Claude Code.
Python SDK for interacting with Claude Code.
Args:
prompt: The prompt to send to Claude
options: Optional configuration (defaults to ClaudeCodeOptions() if None).
Set options.permission_mode to control tool execution:
- 'default': CLI prompts for dangerous tools
- 'acceptEdits': Auto-accept file edits
- 'bypassPermissions': Allow all tools (use with caution)
Set options.cwd for working directory.
Yields:
Messages from the conversation
Example:
```python
# Simple usage
async for message in query(prompt="Hello"):
print(message)
# With options
async for message in query(
prompt="Hello",
options=ClaudeCodeOptions(
system_prompt="You are helpful",
cwd="/home/user"
)
):
print(message)
```
"""
if options is None:
options = ClaudeCodeOptions()
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py"
client = InternalClient()
async for message in client.process_query(prompt=prompt, options=options):
yield message

View file

@ -1,20 +1,10 @@
"""Internal client implementation."""
from collections.abc import AsyncIterator
from collections.abc import AsyncIterable, AsyncIterator
from typing import Any
from ..types import (
AssistantMessage,
ClaudeCodeOptions,
ContentBlock,
Message,
ResultMessage,
SystemMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
from ..types import ClaudeCodeOptions, Message
from .message_parser import parse_message
from .transport.subprocess_cli import SubprocessCLITransport
@ -25,7 +15,7 @@ class InternalClient:
"""Initialize the internal client."""
async def process_query(
self, prompt: str, options: ClaudeCodeOptions
self, prompt: str | AsyncIterable[dict[str, Any]], options: ClaudeCodeOptions
) -> AsyncIterator[Message]:
"""Process a query through transport."""
@ -35,63 +25,9 @@ class InternalClient:
await transport.connect()
async for data in transport.receive_messages():
message = self._parse_message(data)
message = parse_message(data)
if message:
yield message
finally:
await transport.disconnect()
def _parse_message(self, data: dict[str, Any]) -> Message | None:
"""Parse message from CLI output, trusting the structure."""
match data["type"]:
case "user":
return UserMessage(content=data["message"]["content"])
case "assistant":
content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
match block["type"]:
case "text":
content_blocks.append(TextBlock(text=block["text"]))
case "tool_use":
content_blocks.append(
ToolUseBlock(
id=block["id"],
name=block["name"],
input=block["input"],
)
)
case "tool_result":
content_blocks.append(
ToolResultBlock(
tool_use_id=block["tool_use_id"],
content=block.get("content"),
is_error=block.get("is_error"),
)
)
return AssistantMessage(content=content_blocks)
case "system":
return SystemMessage(
subtype=data["subtype"],
data=data,
)
case "result":
return ResultMessage(
subtype=data["subtype"],
duration_ms=data["duration_ms"],
duration_api_ms=data["duration_api_ms"],
is_error=data["is_error"],
num_turns=data["num_turns"],
session_id=data["session_id"],
total_cost_usd=data.get("total_cost_usd"),
usage=data.get("usage"),
result=data.get("result"),
)
case _:
return None

View file

@ -0,0 +1,77 @@
"""Message parser for Claude Code SDK responses."""
from typing import Any
from ..types import (
AssistantMessage,
ContentBlock,
Message,
ResultMessage,
SystemMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
def parse_message(data: dict[str, Any]) -> Message | None:
"""
Parse message from CLI output into typed Message objects.
Args:
data: Raw message dictionary from CLI output
Returns:
Parsed Message object or None if type is unrecognized
"""
match data["type"]:
case "user":
return UserMessage(content=data["message"]["content"])
case "assistant":
content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
match block["type"]:
case "text":
content_blocks.append(TextBlock(text=block["text"]))
case "tool_use":
content_blocks.append(
ToolUseBlock(
id=block["id"],
name=block["name"],
input=block["input"],
)
)
case "tool_result":
content_blocks.append(
ToolResultBlock(
tool_use_id=block["tool_use_id"],
content=block.get("content"),
is_error=block.get("is_error"),
)
)
return AssistantMessage(content=content_blocks)
case "system":
return SystemMessage(
subtype=data["subtype"],
data=data,
)
case "result":
return ResultMessage(
subtype=data["subtype"],
duration_ms=data["duration_ms"],
duration_api_ms=data["duration_api_ms"],
is_error=data["is_error"],
num_turns=data["num_turns"],
session_id=data["session_id"],
total_cost_usd=data.get("total_cost_usd"),
usage=data.get("usage"),
result=data.get("result"),
)
case _:
return None

View file

@ -4,10 +4,10 @@ import json
import logging
import os
import shutil
from collections.abc import AsyncIterator, AsyncIterable
from collections.abc import AsyncIterable, AsyncIterator
from pathlib import Path
from subprocess import PIPE
from typing import Any, Union
from typing import Any
import anyio
from anyio.abc import Process
@ -28,7 +28,7 @@ class SubprocessCLITransport(Transport):
def __init__(
self,
prompt: Union[str, AsyncIterable[dict[str, Any]]],
prompt: str | AsyncIterable[dict[str, Any]],
options: ClaudeCodeOptions,
cli_path: str | Path | None = None,
):
@ -126,8 +126,8 @@ class SubprocessCLITransport(Transport):
cmd.extend(["--input-format", "stream-json"])
else:
# String mode: use --print with the prompt
cmd.extend(["--print", self._prompt])
cmd.extend(["--print", str(self._prompt)])
return cmd
async def connect(self) -> None:
@ -151,7 +151,7 @@ class SubprocessCLITransport(Transport):
self._stdout_stream = TextReceiveStream(self._process.stdout)
if self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
# Handle stdin based on mode
if self._is_streaming:
# Streaming mode: keep stdin open and start streaming task
@ -197,21 +197,39 @@ class SubprocessCLITransport(Transport):
self._stdin_stream = None
async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None:
"""Not used for CLI transport - args passed via command line."""
"""Send additional messages in streaming mode."""
if not self._is_streaming:
raise CLIConnectionError("send_request only works in streaming mode")
if not self._stdin_stream:
raise CLIConnectionError("stdin not available - stream may have ended")
# Send each message as a user message
for message in messages:
# Ensure message has required structure
if not isinstance(message, dict):
message = {
"type": "user",
"message": {"role": "user", "content": str(message)},
"parent_tool_use_id": None,
"session_id": options.get("session_id", "default")
}
await self._stdin_stream.send(json.dumps(message) + "\n")
async def _stream_to_stdin(self) -> None:
"""Stream messages to stdin for streaming mode."""
if not self._stdin_stream or not isinstance(self._prompt, AsyncIterable):
return
try:
async for message in self._prompt:
if not self._stdin_stream:
break
await self._stdin_stream.send(json.dumps(message) + "\n")
# Signal EOF but keep the stream open for control messages
# This matches the TypeScript implementation which calls stdin.end()
# Don't close stdin - keep it open for send_request
# Users can explicitly call disconnect() when done
except Exception as e:
logger.debug(f"Error streaming to stdin: {e}")
if self._stdin_stream:
@ -258,7 +276,7 @@ class SubprocessCLITransport(Transport):
try:
data = json.loads(json_buffer)
json_buffer = ""
# Handle control responses separately
if data.get("type") == "control_response":
request_id = data.get("response", {}).get("request_id")
@ -266,7 +284,7 @@ class SubprocessCLITransport(Transport):
# Store the response for the pending request
self._pending_control_responses[request_id] = data.get("response", {})
continue
try:
yield data
except GeneratorExit:
@ -334,47 +352,47 @@ class SubprocessCLITransport(Transport):
def is_connected(self) -> bool:
"""Check if subprocess is running."""
return self._process is not None and self._process.returncode is None
async def interrupt(self) -> None:
"""Send interrupt control request (only works in streaming mode)."""
if not self._is_streaming:
raise CLIConnectionError("Interrupt requires streaming mode (AsyncIterable prompt)")
if not self._stdin_stream:
raise CLIConnectionError("Not connected or stdin not available")
await self._send_control_request({"subtype": "interrupt"})
async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]:
"""Send a control request and wait for response."""
if not self._stdin_stream:
raise CLIConnectionError("Stdin not available")
# Generate unique request ID
self._request_counter += 1
request_id = f"req_{self._request_counter}_{os.urandom(4).hex()}"
# Build control request
control_request = {
"type": "control_request",
"request_id": request_id,
"request": request
}
# Send request
await self._stdin_stream.send(json.dumps(control_request) + "\n")
# Wait for response with timeout
try:
with anyio.fail_after(30.0): # 30 second timeout
while request_id not in self._pending_control_responses:
await anyio.sleep(0.1)
response = self._pending_control_responses.pop(request_id)
if response.get("subtype") == "error":
raise CLIConnectionError(f"Control request failed: {response.get('error')}")
return response
except TimeoutError:
raise CLIConnectionError("Control request timed out") from None

View file

@ -0,0 +1,208 @@
"""Claude SDK Client for interacting with Claude Code."""
import os
from collections.abc import AsyncIterable, AsyncIterator
from ._errors import CLIConnectionError
from .types import ClaudeCodeOptions, Message, ResultMessage
class ClaudeSDKClient:
"""
Client for bidirectional, interactive conversations with Claude Code.
This client provides full control over the conversation flow with support
for streaming, interrupts, and dynamic message sending. For simple one-shot
queries, consider using the query() function instead.
Key features:
- **Bidirectional**: Send and receive messages at any time
- **Stateful**: Maintains conversation context across messages
- **Interactive**: Send follow-ups based on responses
- **Control flow**: Support for interrupts and session management
When to use ClaudeSDKClient:
- Building chat interfaces or conversational UIs
- Interactive debugging or exploration sessions
- Multi-turn conversations with context
- When you need to react to Claude's responses
- Real-time applications with user input
- When you need interrupt capabilities
When to use query() instead:
- Simple one-off questions
- Batch processing of prompts
- Fire-and-forget automation scripts
- When all inputs are known upfront
- Stateless operations
Example - Interactive conversation:
```python
# Automatically connects with empty stream for interactive use
async with ClaudeSDKClient() as client:
# Send initial message
await client.send_message("Let's solve a math problem step by step")
# Receive and process response
async for message in client.receive_messages():
if "ready" in str(message.content).lower():
break
# Send follow-up based on response
await client.send_message("What's 15% of 80?")
# Continue conversation...
# Automatically disconnects
```
Example - With interrupt:
```python
async with ClaudeSDKClient() as client:
# Start a long task
await client.send_message("Count to 1000")
# Interrupt after 2 seconds
await asyncio.sleep(2)
await client.interrupt()
# Send new instruction
await client.send_message("Never mind, what's 2+2?")
```
Example - Manual connection:
```python
client = ClaudeSDKClient()
# Connect with initial message stream
async def message_stream():
yield {"type": "user", "message": {"role": "user", "content": "Hello"}}
await client.connect(message_stream())
# Send additional messages dynamically
await client.send_message("What's the weather?")
async for message in client.receive_messages():
print(message)
await client.disconnect()
```
"""
def __init__(self, options: ClaudeCodeOptions | None = None):
"""Initialize Claude SDK client."""
if options is None:
options = ClaudeCodeOptions()
self.options = options
self._transport = None
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py-client"
async def connect(self, prompt: str | AsyncIterable[dict] | None = None) -> None:
"""Connect to Claude with a prompt or message stream."""
from ._internal.transport.subprocess_cli import SubprocessCLITransport
# Auto-connect with empty async iterable if no prompt is provided
async def _empty_stream():
# Never yields, but indicates that this function is an iterator and
# keeps the connection open.
if False:
yield
self._transport = SubprocessCLITransport(
prompt=_empty_stream() if prompt is None else prompt,
options=self.options,
)
await self._transport.connect()
async def receive_messages(self) -> AsyncIterator[Message]:
"""Receive all messages from Claude."""
if not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
from ._internal.message_parser import parse_message
async for data in self._transport.receive_messages():
message = parse_message(data)
if message:
yield message
async def send_message(self, content: str, session_id: str = "default") -> None:
"""Send a new message in streaming mode."""
if not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
message = {
"type": "user",
"message": {"role": "user", "content": content},
"parent_tool_use_id": None,
"session_id": session_id,
}
await self._transport.send_request([message], {"session_id": session_id})
async def interrupt(self) -> None:
"""Send interrupt signal (only works with streaming mode)."""
if not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._transport.interrupt()
async def receive_response(self) -> tuple[list[Message], ResultMessage | None]:
"""
Receive a complete response from Claude, collecting all messages until ResultMessage.
Compared to receive_messages(), this is a convenience method that
handles the common pattern of receiving messages until Claude completes
its response. It collects all messages and returns them along with the
final ResultMessage.
Returns:
tuple: A tuple of (messages, result) where:
- messages: List of all messages received (UserMessage, AssistantMessage, SystemMessage)
- result: The final ResultMessage if received, None if stream ended without result
Example:
```python
async with ClaudeSDKClient() as client:
# First turn
await client.send_message("What's the capital of France?")
messages, result = await client.receive_response()
# Extract assistant's response
for msg in messages:
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# Second turn
await client.send_message("What's the population?")
messages, result = await client.receive_response()
# ... process response
```
"""
from .types import ResultMessage
messages = []
async for message in self.receive_messages():
messages.append(message)
if isinstance(message, ResultMessage):
return messages, message
# Stream ended without ResultMessage
return messages, None
async def disconnect(self) -> None:
"""Disconnect from Claude."""
if self._transport:
await self._transport.disconnect()
self._transport = None
async def __aenter__(self):
"""Enter async context - automatically connects with empty stream for interactive use."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context - always disconnects."""
await self.disconnect()
return False

View file

@ -0,0 +1,99 @@
"""Query function for one-shot interactions with Claude Code."""
import os
from collections.abc import AsyncIterable, AsyncIterator
from ._internal.client import InternalClient
from .types import ClaudeCodeOptions, Message
async def query(
*, prompt: str | AsyncIterable[dict], options: ClaudeCodeOptions | None = None
) -> AsyncIterator[Message]:
"""
Query Claude Code for one-shot or unidirectional streaming interactions.
This function is ideal for simple, stateless queries where you don't need
bidirectional communication or conversation management. For interactive,
stateful conversations, use ClaudeSDKClient instead.
Key differences from ClaudeSDKClient:
- **Unidirectional**: Send all messages upfront, receive all responses
- **Stateless**: Each query is independent, no conversation state
- **Simple**: Fire-and-forget style, no connection management
- **No interrupts**: Cannot interrupt or send follow-up messages
When to use query():
- Simple one-off questions ("What is 2+2?")
- Batch processing of independent prompts
- Code generation or analysis tasks
- Automated scripts and CI/CD pipelines
- When you know all inputs upfront
When to use ClaudeSDKClient:
- Interactive conversations with follow-ups
- Chat applications or REPL-like interfaces
- When you need to send messages based on responses
- When you need interrupt capabilities
- Long-running sessions with state
Args:
prompt: The prompt to send to Claude. Can be a string for single-shot queries
or an AsyncIterable[dict] for streaming mode with continuous interaction.
In streaming mode, each dict should have the structure:
{
"type": "user",
"message": {"role": "user", "content": "..."},
"parent_tool_use_id": None,
"session_id": "..."
}
options: Optional configuration (defaults to ClaudeCodeOptions() if None).
Set options.permission_mode to control tool execution:
- 'default': CLI prompts for dangerous tools
- 'acceptEdits': Auto-accept file edits
- 'bypassPermissions': Allow all tools (use with caution)
Set options.cwd for working directory.
Yields:
Messages from the conversation
Example - Simple query:
```python
# One-off question
async for message in query(prompt="What is the capital of France?"):
print(message)
```
Example - With options:
```python
# Code generation with specific settings
async for message in query(
prompt="Create a Python web server",
options=ClaudeCodeOptions(
system_prompt="You are an expert Python developer",
cwd="/home/user/project"
)
):
print(message)
```
Example - Streaming mode (still unidirectional):
```python
async def prompts():
yield {"type": "user", "message": {"role": "user", "content": "Hello"}}
yield {"type": "user", "message": {"role": "user", "content": "How are you?"}}
# All prompts are sent, then all responses received
async for message in query(prompt=prompts()):
print(message)
```
"""
if options is None:
options = ClaudeCodeOptions()
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py"
client = InternalClient()
async for message in client.process_query(prompt=prompt, options=options):
yield message