diff --git a/examples/streaming_mode_example.py b/examples/streaming_mode_example.py new file mode 100644 index 0000000..ed782a9 --- /dev/null +++ b/examples/streaming_mode_example.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +"""Example demonstrating streaming mode with bidirectional communication.""" + +import asyncio +from collections.abc import AsyncIterator + +from claude_code_sdk import ClaudeCodeOptions, ClaudeSDKClient, query + + +async def create_message_stream() -> AsyncIterator[dict]: + """Create an async stream of user messages.""" + # Example messages to send + messages = [ + { + "type": "user", + "message": { + "role": "user", + "content": "Hello! Please tell me a bit about Python async programming.", + }, + "parent_tool_use_id": None, + "session_id": "example-session-1", + }, + # Add a delay to simulate interactive conversation + None, # We'll use this as a signal to delay + { + "type": "user", + "message": { + "role": "user", + "content": "Can you give me a simple code example?", + }, + "parent_tool_use_id": None, + "session_id": "example-session-1", + }, + ] + + for msg in messages: + if msg is None: + await asyncio.sleep(2) # Simulate user thinking time + continue + yield msg + + +async def example_string_mode(): + """Example using traditional string mode (backward compatible).""" + print("=== String Mode Example ===") + + # Option 1: Using query function + async for message in query( + prompt="What is 2+2? Please give a brief answer.", options=ClaudeCodeOptions() + ): + print(f"Received: {type(message).__name__}") + if hasattr(message, "content"): + print(f" Content: {message.content}") + + print("Completed\n") + + +async def example_streaming_mode(): + """Example using new streaming mode with async iterable.""" + print("=== Streaming Mode Example ===") + + options = ClaudeCodeOptions() + + # Create message stream + message_stream = create_message_stream() + + # Use query with async iterable + message_count = 0 + async for message in query(prompt=message_stream, options=options): + message_count += 1 + msg_type = type(message).__name__ + + print(f"\nMessage #{message_count} ({msg_type}):") + + if hasattr(message, "content"): + content = message.content + if isinstance(content, list): + for block in content: + if hasattr(block, "text"): + print(f" {block.text}") + else: + print(f" {content}") + elif hasattr(message, "subtype"): + print(f" Subtype: {message.subtype}") + + print("\nCompleted") + + +async def example_with_context_manager(): + """Example using context manager for cleaner code.""" + print("=== Context Manager Example ===") + + # Simple one-shot query with automatic cleanup + async with ClaudeSDKClient() as client: + await client.send_message("What is the meaning of life?") + async for message in client.receive_messages(): + if hasattr(message, "content"): + print(f"Response: {message.content}") + + print("\nCompleted with automatic cleanup\n") + + +async def example_with_interrupt(): + """Example demonstrating interrupt functionality.""" + print("=== Streaming Mode with Interrupt Example ===") + + options = ClaudeCodeOptions() + client = ClaudeSDKClient(options=options) + + async def interruptible_stream(): + """Stream that we'll interrupt.""" + yield { + "type": "user", + "message": { + "role": "user", + "content": "Count to 1000 slowly, saying each number.", + }, + "parent_tool_use_id": None, + "session_id": "interrupt-example", + } + # Keep the stream open by waiting indefinitely + # This prevents stdin from being closed + await asyncio.Event().wait() + + try: + await client.connect(interruptible_stream()) + print("Connected - will interrupt after 3 seconds") + + # Create tasks for receiving and interrupting + async def receive_and_interrupt(): + # Start a background task to continuously receive messages + async def receive_messages(): + async for message in client.receive_messages(): + msg_type = type(message).__name__ + print(f"Received: {msg_type}") + + if hasattr(message, "content") and isinstance( + message.content, list + ): + for block in message.content: + if hasattr(block, "text"): + print(f" {block.text[:50]}...") # First 50 chars + + # Start receiving in background + receive_task = asyncio.create_task(receive_messages()) + + # Wait 3 seconds then interrupt + await asyncio.sleep(3) + print("\nSending interrupt signal...") + + try: + await client.interrupt() + print("Interrupt sent successfully") + except Exception as e: + print(f"Interrupt error: {e}") + + # Give some time to see any final messages + await asyncio.sleep(2) + + # Cancel the receive task + receive_task.cancel() + try: + await receive_task + except asyncio.CancelledError: + pass + + await receive_and_interrupt() + + except Exception as e: + print(f"Error: {e}") + finally: + await client.disconnect() + print("\nDisconnected") + + +async def main(): + """Run all examples.""" + # Run string mode example + await example_string_mode() + + # Run streaming mode example + await example_streaming_mode() + + # Run context manager example + await example_with_context_manager() + + # Run interrupt example + await example_with_interrupt() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/claude_code_sdk/__init__.py b/src/claude_code_sdk/__init__.py index b8a1152..dc84df1 100644 --- a/src/claude_code_sdk/__init__.py +++ b/src/claude_code_sdk/__init__.py @@ -1,7 +1,5 @@ """Claude SDK for Python.""" -import os -from collections.abc import AsyncIterator from ._errors import ( ClaudeSDKError, @@ -10,7 +8,8 @@ from ._errors import ( CLINotFoundError, ProcessError, ) -from ._internal.client import InternalClient +from .client import ClaudeSDKClient +from .query import query from .types import ( AssistantMessage, ClaudeCodeOptions, @@ -29,8 +28,9 @@ from .types import ( __version__ = "0.0.14" __all__ = [ - # Main function + # Main exports "query", + "ClaudeSDKClient", # Types "PermissionMode", "McpServerConfig", @@ -51,52 +51,3 @@ __all__ = [ "ProcessError", "CLIJSONDecodeError", ] - - -async def query( - *, prompt: str, options: ClaudeCodeOptions | None = None -) -> AsyncIterator[Message]: - """ - Query Claude Code. - - Python SDK for interacting with Claude Code. - - Args: - prompt: The prompt to send to Claude - options: Optional configuration (defaults to ClaudeCodeOptions() if None). - Set options.permission_mode to control tool execution: - - 'default': CLI prompts for dangerous tools - - 'acceptEdits': Auto-accept file edits - - 'bypassPermissions': Allow all tools (use with caution) - Set options.cwd for working directory. - - Yields: - Messages from the conversation - - - Example: - ```python - # Simple usage - async for message in query(prompt="Hello"): - print(message) - - # With options - async for message in query( - prompt="Hello", - options=ClaudeCodeOptions( - system_prompt="You are helpful", - cwd="/home/user" - ) - ): - print(message) - ``` - """ - if options is None: - options = ClaudeCodeOptions() - - os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py" - - client = InternalClient() - - async for message in client.process_query(prompt=prompt, options=options): - yield message diff --git a/src/claude_code_sdk/_internal/client.py b/src/claude_code_sdk/_internal/client.py index ef1070d..fb4eeb8 100644 --- a/src/claude_code_sdk/_internal/client.py +++ b/src/claude_code_sdk/_internal/client.py @@ -1,20 +1,10 @@ """Internal client implementation.""" -from collections.abc import AsyncIterator +from collections.abc import AsyncIterable, AsyncIterator from typing import Any -from ..types import ( - AssistantMessage, - ClaudeCodeOptions, - ContentBlock, - Message, - ResultMessage, - SystemMessage, - TextBlock, - ToolResultBlock, - ToolUseBlock, - UserMessage, -) +from ..types import ClaudeCodeOptions, Message +from .message_parser import parse_message from .transport.subprocess_cli import SubprocessCLITransport @@ -25,7 +15,7 @@ class InternalClient: """Initialize the internal client.""" async def process_query( - self, prompt: str, options: ClaudeCodeOptions + self, prompt: str | AsyncIterable[dict[str, Any]], options: ClaudeCodeOptions ) -> AsyncIterator[Message]: """Process a query through transport.""" @@ -35,63 +25,9 @@ class InternalClient: await transport.connect() async for data in transport.receive_messages(): - message = self._parse_message(data) + message = parse_message(data) if message: yield message finally: await transport.disconnect() - - def _parse_message(self, data: dict[str, Any]) -> Message | None: - """Parse message from CLI output, trusting the structure.""" - - match data["type"]: - case "user": - return UserMessage(content=data["message"]["content"]) - - case "assistant": - content_blocks: list[ContentBlock] = [] - for block in data["message"]["content"]: - match block["type"]: - case "text": - content_blocks.append(TextBlock(text=block["text"])) - case "tool_use": - content_blocks.append( - ToolUseBlock( - id=block["id"], - name=block["name"], - input=block["input"], - ) - ) - case "tool_result": - content_blocks.append( - ToolResultBlock( - tool_use_id=block["tool_use_id"], - content=block.get("content"), - is_error=block.get("is_error"), - ) - ) - - return AssistantMessage(content=content_blocks) - - case "system": - return SystemMessage( - subtype=data["subtype"], - data=data, - ) - - case "result": - return ResultMessage( - subtype=data["subtype"], - duration_ms=data["duration_ms"], - duration_api_ms=data["duration_api_ms"], - is_error=data["is_error"], - num_turns=data["num_turns"], - session_id=data["session_id"], - total_cost_usd=data.get("total_cost_usd"), - usage=data.get("usage"), - result=data.get("result"), - ) - - case _: - return None diff --git a/src/claude_code_sdk/_internal/message_parser.py b/src/claude_code_sdk/_internal/message_parser.py new file mode 100644 index 0000000..a2b88d2 --- /dev/null +++ b/src/claude_code_sdk/_internal/message_parser.py @@ -0,0 +1,77 @@ +"""Message parser for Claude Code SDK responses.""" + +from typing import Any + +from ..types import ( + AssistantMessage, + ContentBlock, + Message, + ResultMessage, + SystemMessage, + TextBlock, + ToolResultBlock, + ToolUseBlock, + UserMessage, +) + + +def parse_message(data: dict[str, Any]) -> Message | None: + """ + Parse message from CLI output into typed Message objects. + + Args: + data: Raw message dictionary from CLI output + + Returns: + Parsed Message object or None if type is unrecognized + """ + match data["type"]: + case "user": + return UserMessage(content=data["message"]["content"]) + + case "assistant": + content_blocks: list[ContentBlock] = [] + for block in data["message"]["content"]: + match block["type"]: + case "text": + content_blocks.append(TextBlock(text=block["text"])) + case "tool_use": + content_blocks.append( + ToolUseBlock( + id=block["id"], + name=block["name"], + input=block["input"], + ) + ) + case "tool_result": + content_blocks.append( + ToolResultBlock( + tool_use_id=block["tool_use_id"], + content=block.get("content"), + is_error=block.get("is_error"), + ) + ) + + return AssistantMessage(content=content_blocks) + + case "system": + return SystemMessage( + subtype=data["subtype"], + data=data, + ) + + case "result": + return ResultMessage( + subtype=data["subtype"], + duration_ms=data["duration_ms"], + duration_api_ms=data["duration_api_ms"], + is_error=data["is_error"], + num_turns=data["num_turns"], + session_id=data["session_id"], + total_cost_usd=data.get("total_cost_usd"), + usage=data.get("usage"), + result=data.get("result"), + ) + + case _: + return None diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index b17f85e..22f3000 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -4,10 +4,10 @@ import json import logging import os import shutil -from collections.abc import AsyncIterator, AsyncIterable +from collections.abc import AsyncIterable, AsyncIterator from pathlib import Path from subprocess import PIPE -from typing import Any, Union +from typing import Any import anyio from anyio.abc import Process @@ -28,7 +28,7 @@ class SubprocessCLITransport(Transport): def __init__( self, - prompt: Union[str, AsyncIterable[dict[str, Any]]], + prompt: str | AsyncIterable[dict[str, Any]], options: ClaudeCodeOptions, cli_path: str | Path | None = None, ): @@ -126,8 +126,8 @@ class SubprocessCLITransport(Transport): cmd.extend(["--input-format", "stream-json"]) else: # String mode: use --print with the prompt - cmd.extend(["--print", self._prompt]) - + cmd.extend(["--print", str(self._prompt)]) + return cmd async def connect(self) -> None: @@ -151,7 +151,7 @@ class SubprocessCLITransport(Transport): self._stdout_stream = TextReceiveStream(self._process.stdout) if self._process.stderr: self._stderr_stream = TextReceiveStream(self._process.stderr) - + # Handle stdin based on mode if self._is_streaming: # Streaming mode: keep stdin open and start streaming task @@ -197,21 +197,39 @@ class SubprocessCLITransport(Transport): self._stdin_stream = None async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None: - """Not used for CLI transport - args passed via command line.""" + """Send additional messages in streaming mode.""" + if not self._is_streaming: + raise CLIConnectionError("send_request only works in streaming mode") + + if not self._stdin_stream: + raise CLIConnectionError("stdin not available - stream may have ended") + + # Send each message as a user message + for message in messages: + # Ensure message has required structure + if not isinstance(message, dict): + message = { + "type": "user", + "message": {"role": "user", "content": str(message)}, + "parent_tool_use_id": None, + "session_id": options.get("session_id", "default") + } + + await self._stdin_stream.send(json.dumps(message) + "\n") async def _stream_to_stdin(self) -> None: """Stream messages to stdin for streaming mode.""" if not self._stdin_stream or not isinstance(self._prompt, AsyncIterable): return - + try: async for message in self._prompt: if not self._stdin_stream: break await self._stdin_stream.send(json.dumps(message) + "\n") - - # Signal EOF but keep the stream open for control messages - # This matches the TypeScript implementation which calls stdin.end() + + # Don't close stdin - keep it open for send_request + # Users can explicitly call disconnect() when done except Exception as e: logger.debug(f"Error streaming to stdin: {e}") if self._stdin_stream: @@ -258,7 +276,7 @@ class SubprocessCLITransport(Transport): try: data = json.loads(json_buffer) json_buffer = "" - + # Handle control responses separately if data.get("type") == "control_response": request_id = data.get("response", {}).get("request_id") @@ -266,7 +284,7 @@ class SubprocessCLITransport(Transport): # Store the response for the pending request self._pending_control_responses[request_id] = data.get("response", {}) continue - + try: yield data except GeneratorExit: @@ -334,47 +352,47 @@ class SubprocessCLITransport(Transport): def is_connected(self) -> bool: """Check if subprocess is running.""" return self._process is not None and self._process.returncode is None - + async def interrupt(self) -> None: """Send interrupt control request (only works in streaming mode).""" if not self._is_streaming: raise CLIConnectionError("Interrupt requires streaming mode (AsyncIterable prompt)") - + if not self._stdin_stream: raise CLIConnectionError("Not connected or stdin not available") - + await self._send_control_request({"subtype": "interrupt"}) - + async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]: """Send a control request and wait for response.""" if not self._stdin_stream: raise CLIConnectionError("Stdin not available") - + # Generate unique request ID self._request_counter += 1 request_id = f"req_{self._request_counter}_{os.urandom(4).hex()}" - + # Build control request control_request = { "type": "control_request", "request_id": request_id, "request": request } - + # Send request await self._stdin_stream.send(json.dumps(control_request) + "\n") - + # Wait for response with timeout try: with anyio.fail_after(30.0): # 30 second timeout while request_id not in self._pending_control_responses: await anyio.sleep(0.1) - + response = self._pending_control_responses.pop(request_id) - + if response.get("subtype") == "error": raise CLIConnectionError(f"Control request failed: {response.get('error')}") - + return response except TimeoutError: raise CLIConnectionError("Control request timed out") from None diff --git a/src/claude_code_sdk/client.py b/src/claude_code_sdk/client.py new file mode 100644 index 0000000..a75eece --- /dev/null +++ b/src/claude_code_sdk/client.py @@ -0,0 +1,208 @@ +"""Claude SDK Client for interacting with Claude Code.""" + +import os +from collections.abc import AsyncIterable, AsyncIterator + +from ._errors import CLIConnectionError +from .types import ClaudeCodeOptions, Message, ResultMessage + + +class ClaudeSDKClient: + """ + Client for bidirectional, interactive conversations with Claude Code. + + This client provides full control over the conversation flow with support + for streaming, interrupts, and dynamic message sending. For simple one-shot + queries, consider using the query() function instead. + + Key features: + - **Bidirectional**: Send and receive messages at any time + - **Stateful**: Maintains conversation context across messages + - **Interactive**: Send follow-ups based on responses + - **Control flow**: Support for interrupts and session management + + When to use ClaudeSDKClient: + - Building chat interfaces or conversational UIs + - Interactive debugging or exploration sessions + - Multi-turn conversations with context + - When you need to react to Claude's responses + - Real-time applications with user input + - When you need interrupt capabilities + + When to use query() instead: + - Simple one-off questions + - Batch processing of prompts + - Fire-and-forget automation scripts + - When all inputs are known upfront + - Stateless operations + + Example - Interactive conversation: + ```python + # Automatically connects with empty stream for interactive use + async with ClaudeSDKClient() as client: + # Send initial message + await client.send_message("Let's solve a math problem step by step") + + # Receive and process response + async for message in client.receive_messages(): + if "ready" in str(message.content).lower(): + break + + # Send follow-up based on response + await client.send_message("What's 15% of 80?") + + # Continue conversation... + # Automatically disconnects + ``` + + Example - With interrupt: + ```python + async with ClaudeSDKClient() as client: + # Start a long task + await client.send_message("Count to 1000") + + # Interrupt after 2 seconds + await asyncio.sleep(2) + await client.interrupt() + + # Send new instruction + await client.send_message("Never mind, what's 2+2?") + ``` + + Example - Manual connection: + ```python + client = ClaudeSDKClient() + + # Connect with initial message stream + async def message_stream(): + yield {"type": "user", "message": {"role": "user", "content": "Hello"}} + + await client.connect(message_stream()) + + # Send additional messages dynamically + await client.send_message("What's the weather?") + + async for message in client.receive_messages(): + print(message) + + await client.disconnect() + ``` + """ + + def __init__(self, options: ClaudeCodeOptions | None = None): + """Initialize Claude SDK client.""" + if options is None: + options = ClaudeCodeOptions() + self.options = options + self._transport = None + os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py-client" + + async def connect(self, prompt: str | AsyncIterable[dict] | None = None) -> None: + """Connect to Claude with a prompt or message stream.""" + from ._internal.transport.subprocess_cli import SubprocessCLITransport + + # Auto-connect with empty async iterable if no prompt is provided + async def _empty_stream(): + # Never yields, but indicates that this function is an iterator and + # keeps the connection open. + if False: + yield + + self._transport = SubprocessCLITransport( + prompt=_empty_stream() if prompt is None else prompt, + options=self.options, + ) + await self._transport.connect() + + async def receive_messages(self) -> AsyncIterator[Message]: + """Receive all messages from Claude.""" + if not self._transport: + raise CLIConnectionError("Not connected. Call connect() first.") + + from ._internal.message_parser import parse_message + + async for data in self._transport.receive_messages(): + message = parse_message(data) + if message: + yield message + + async def send_message(self, content: str, session_id: str = "default") -> None: + """Send a new message in streaming mode.""" + if not self._transport: + raise CLIConnectionError("Not connected. Call connect() first.") + + message = { + "type": "user", + "message": {"role": "user", "content": content}, + "parent_tool_use_id": None, + "session_id": session_id, + } + + await self._transport.send_request([message], {"session_id": session_id}) + + async def interrupt(self) -> None: + """Send interrupt signal (only works with streaming mode).""" + if not self._transport: + raise CLIConnectionError("Not connected. Call connect() first.") + await self._transport.interrupt() + + async def receive_response(self) -> tuple[list[Message], ResultMessage | None]: + """ + Receive a complete response from Claude, collecting all messages until ResultMessage. + + Compared to receive_messages(), this is a convenience method that + handles the common pattern of receiving messages until Claude completes + its response. It collects all messages and returns them along with the + final ResultMessage. + + Returns: + tuple: A tuple of (messages, result) where: + - messages: List of all messages received (UserMessage, AssistantMessage, SystemMessage) + - result: The final ResultMessage if received, None if stream ended without result + + Example: + ```python + async with ClaudeSDKClient() as client: + # First turn + await client.send_message("What's the capital of France?") + messages, result = await client.receive_response() + + # Extract assistant's response + for msg in messages: + if isinstance(msg, AssistantMessage): + for block in msg.content: + if isinstance(block, TextBlock): + print(f"Claude: {block.text}") + + # Second turn + await client.send_message("What's the population?") + messages, result = await client.receive_response() + # ... process response + ``` + """ + from .types import ResultMessage + + messages = [] + async for message in self.receive_messages(): + messages.append(message) + if isinstance(message, ResultMessage): + return messages, message + + # Stream ended without ResultMessage + return messages, None + + async def disconnect(self) -> None: + """Disconnect from Claude.""" + if self._transport: + await self._transport.disconnect() + self._transport = None + + async def __aenter__(self): + """Enter async context - automatically connects with empty stream for interactive use.""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit async context - always disconnects.""" + await self.disconnect() + return False diff --git a/src/claude_code_sdk/query.py b/src/claude_code_sdk/query.py new file mode 100644 index 0000000..4bd7e96 --- /dev/null +++ b/src/claude_code_sdk/query.py @@ -0,0 +1,99 @@ +"""Query function for one-shot interactions with Claude Code.""" + +import os +from collections.abc import AsyncIterable, AsyncIterator + +from ._internal.client import InternalClient +from .types import ClaudeCodeOptions, Message + + +async def query( + *, prompt: str | AsyncIterable[dict], options: ClaudeCodeOptions | None = None +) -> AsyncIterator[Message]: + """ + Query Claude Code for one-shot or unidirectional streaming interactions. + + This function is ideal for simple, stateless queries where you don't need + bidirectional communication or conversation management. For interactive, + stateful conversations, use ClaudeSDKClient instead. + + Key differences from ClaudeSDKClient: + - **Unidirectional**: Send all messages upfront, receive all responses + - **Stateless**: Each query is independent, no conversation state + - **Simple**: Fire-and-forget style, no connection management + - **No interrupts**: Cannot interrupt or send follow-up messages + + When to use query(): + - Simple one-off questions ("What is 2+2?") + - Batch processing of independent prompts + - Code generation or analysis tasks + - Automated scripts and CI/CD pipelines + - When you know all inputs upfront + + When to use ClaudeSDKClient: + - Interactive conversations with follow-ups + - Chat applications or REPL-like interfaces + - When you need to send messages based on responses + - When you need interrupt capabilities + - Long-running sessions with state + + Args: + prompt: The prompt to send to Claude. Can be a string for single-shot queries + or an AsyncIterable[dict] for streaming mode with continuous interaction. + In streaming mode, each dict should have the structure: + { + "type": "user", + "message": {"role": "user", "content": "..."}, + "parent_tool_use_id": None, + "session_id": "..." + } + options: Optional configuration (defaults to ClaudeCodeOptions() if None). + Set options.permission_mode to control tool execution: + - 'default': CLI prompts for dangerous tools + - 'acceptEdits': Auto-accept file edits + - 'bypassPermissions': Allow all tools (use with caution) + Set options.cwd for working directory. + + Yields: + Messages from the conversation + + Example - Simple query: + ```python + # One-off question + async for message in query(prompt="What is the capital of France?"): + print(message) + ``` + + Example - With options: + ```python + # Code generation with specific settings + async for message in query( + prompt="Create a Python web server", + options=ClaudeCodeOptions( + system_prompt="You are an expert Python developer", + cwd="/home/user/project" + ) + ): + print(message) + ``` + + Example - Streaming mode (still unidirectional): + ```python + async def prompts(): + yield {"type": "user", "message": {"role": "user", "content": "Hello"}} + yield {"type": "user", "message": {"role": "user", "content": "How are you?"}} + + # All prompts are sent, then all responses received + async for message in query(prompt=prompts()): + print(message) + ``` + """ + if options is None: + options = ClaudeCodeOptions() + + os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py" + + client = InternalClient() + + async for message in client.process_query(prompt=prompt, options=options): + yield message