Improve examples

This commit is contained in:
Dickson Tsai 2025-07-19 19:57:17 -07:00
parent 3b56577b2f
commit b57e05afa5
No known key found for this signature in database
4 changed files with 236 additions and 90 deletions

226
examples/streaming_mode.py Normal file → Executable file
View file

@ -4,10 +4,20 @@ Comprehensive examples of using ClaudeSDKClient for streaming mode.
This file demonstrates various patterns for building applications with
the ClaudeSDKClient streaming interface.
The queries are intentionally simplistic. In reality, a query can be a more
complex task that Claude SDK uses its agentic capabilities and tools (e.g. run
bash commands, edit files, search the web, fetch web content) to accomplish.
Usage:
./examples/streaming_mode.py - List the examples
./examples/streaming_mode.py all - Run all examples
./examples/streaming_mode.py basic_streaming - Run a specific example
"""
import asyncio
import contextlib
import sys
from claude_code_sdk import (
AssistantMessage,
@ -15,28 +25,48 @@ from claude_code_sdk import (
ClaudeSDKClient,
CLIConnectionError,
ResultMessage,
SystemMessage,
TextBlock,
UserMessage,
)
def display_message(msg):
"""Standardized message display function.
- UserMessage: "User: <content>"
- AssistantMessage: "Claude: <content>"
- SystemMessage: ignored
- ResultMessage: "Result ended" + cost if available
"""
if isinstance(msg, UserMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"User: {block.text}")
elif isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, SystemMessage):
# Ignore system messages
pass
elif isinstance(msg, ResultMessage):
print("Result ended")
async def example_basic_streaming():
"""Basic streaming with context manager."""
print("=== Basic Streaming Example ===")
async with ClaudeSDKClient() as client:
# Send a message
await client.send_message("What is 2+2?")
print("User: What is 2+2?")
await client.query("What is 2+2?")
# Receive complete response using the helper method
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage) and msg.total_cost_usd:
print(f"Cost: ${msg.total_cost_usd:.4f}")
display_message(msg)
print("Session ended\n")
print("\n")
async def example_multi_turn_conversation():
@ -46,26 +76,20 @@ async def example_multi_turn_conversation():
async with ClaudeSDKClient() as client:
# First turn
print("User: What's the capital of France?")
await client.send_message("What's the capital of France?")
await client.query("What's the capital of France?")
# Extract and print response
async for msg in client.receive_response():
content_blocks = getattr(msg, 'content', [])
for block in content_blocks:
if isinstance(block, TextBlock):
print(f"{block.text}")
display_message(msg)
# Second turn - follow-up
print("\nUser: What's the population of that city?")
await client.send_message("What's the population of that city?")
await client.query("What's the population of that city?")
async for msg in client.receive_response():
content_blocks = getattr(msg, 'content', [])
for block in content_blocks:
if isinstance(block, TextBlock):
print(f"{block.text}")
display_message(msg)
print("\nConversation ended\n")
print("\n")
async def example_concurrent_responses():
@ -76,10 +100,7 @@ async def example_concurrent_responses():
# Background task to continuously receive messages
async def receive_messages():
async for message in client.receive_messages():
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
display_message(message)
# Start receiving in background
receive_task = asyncio.create_task(receive_messages())
@ -93,7 +114,7 @@ async def example_concurrent_responses():
for question in questions:
print(f"\nUser: {question}")
await client.send_message(question)
await client.query(question)
await asyncio.sleep(3) # Wait between messages
# Give time for final responses
@ -104,7 +125,7 @@ async def example_concurrent_responses():
with contextlib.suppress(asyncio.CancelledError):
await receive_task
print("\nSession ended\n")
print("\n")
async def example_with_interrupt():
@ -115,7 +136,7 @@ async def example_with_interrupt():
async with ClaudeSDKClient() as client:
# Start a long-running task
print("\nUser: Count from 1 to 100 slowly")
await client.send_message(
await client.query(
"Count from 1 to 100 slowly, with a brief pause between each number"
)
@ -132,10 +153,10 @@ async def example_with_interrupt():
if isinstance(block, TextBlock):
# Print first few numbers
print(f"Claude: {block.text[:50]}...")
# Stop when we get a result after interrupt
if isinstance(message, ResultMessage) and interrupt_sent:
break
elif isinstance(message, ResultMessage):
display_message(message)
if interrupt_sent:
break
# Start consuming messages in the background
consume_task = asyncio.create_task(consume_messages())
@ -151,16 +172,13 @@ async def example_with_interrupt():
# Send new instruction after interrupt
print("\nUser: Never mind, just tell me a quick joke")
await client.send_message("Never mind, just tell me a quick joke")
await client.query("Never mind, just tell me a quick joke")
# Get the joke
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
display_message(msg)
print("\nSession ended\n")
print("\n")
async def example_manual_message_handling():
@ -168,7 +186,7 @@ async def example_manual_message_handling():
print("=== Manual Message Handling Example ===")
async with ClaudeSDKClient() as client:
await client.send_message(
await client.query(
"List 5 programming languages and their main use cases"
)
@ -180,6 +198,7 @@ async def example_manual_message_handling():
for block in message.content:
if isinstance(block, TextBlock):
text = block.text
print(f"Claude: {text}")
# Custom logic: extract language names
for lang in [
"Python",
@ -193,12 +212,12 @@ async def example_manual_message_handling():
if lang in text and lang not in languages_found:
languages_found.append(lang)
print(f"Found language: {lang}")
elif isinstance(message, ResultMessage):
print(f"\nTotal languages mentioned: {len(languages_found)}")
display_message(message)
print(f"Total languages mentioned: {len(languages_found)}")
break
print("\nSession ended\n")
print("\n")
async def example_with_options():
@ -213,23 +232,75 @@ async def example_with_options():
)
async with ClaudeSDKClient(options=options) as client:
await client.send_message(
print("User: Create a simple hello.txt file with a greeting message")
await client.query(
"Create a simple hello.txt file with a greeting message"
)
tool_uses = []
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
display_message(msg)
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif hasattr(block, "name"): # ToolUseBlock
if hasattr(block, "name") and not isinstance(
block, TextBlock
): # ToolUseBlock
tool_uses.append(getattr(block, "name", ""))
else:
display_message(msg)
if tool_uses:
print(f"\nTools used: {', '.join(tool_uses)}")
print(f"Tools used: {', '.join(tool_uses)}")
print("\nSession ended\n")
print("\n")
async def example_async_iterable_prompt():
"""Demonstrate send_message with async iterable."""
print("=== Async Iterable Prompt Example ===")
async def create_message_stream():
"""Generate a stream of messages."""
print("User: Hello! I have multiple questions.")
yield {
"type": "user",
"message": {"role": "user", "content": "Hello! I have multiple questions."},
"parent_tool_use_id": None,
"session_id": "qa-session",
}
print("User: First, what's the capital of Japan?")
yield {
"type": "user",
"message": {
"role": "user",
"content": "First, what's the capital of Japan?",
},
"parent_tool_use_id": None,
"session_id": "qa-session",
}
print("User: Second, what's 15% of 200?")
yield {
"type": "user",
"message": {"role": "user", "content": "Second, what's 15% of 200?"},
"parent_tool_use_id": None,
"session_id": "qa-session",
}
async with ClaudeSDKClient() as client:
# Send async iterable of messages
await client.query(create_message_stream())
# Receive the three responses
async for msg in client.receive_response():
display_message(msg)
async for msg in client.receive_response():
display_message(msg)
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_error_handling():
@ -242,7 +313,8 @@ async def example_error_handling():
await client.connect()
# Send a message that will take time to process
await client.send_message("Run a bash sleep command for 60 seconds")
print("User: Run a bash sleep command for 60 seconds")
await client.query("Run a bash sleep command for 60 seconds")
# Try to receive response with a short timeout
try:
@ -255,11 +327,13 @@ async def example_error_handling():
if isinstance(block, TextBlock):
print(f"Claude: {block.text[:50]}...")
elif isinstance(msg, ResultMessage):
print("Received complete response")
display_message(msg)
break
except asyncio.TimeoutError:
print("\nResponse timeout after 10 seconds - demonstrating graceful handling")
print(
"\nResponse timeout after 10 seconds - demonstrating graceful handling"
)
print(f"Received {len(messages)} messages before timeout")
except CLIConnectionError as e:
@ -272,24 +346,48 @@ async def example_error_handling():
# Always disconnect
await client.disconnect()
print("\nSession ended\n")
print("\n")
async def main():
"""Run all examples."""
examples = [
example_basic_streaming,
example_multi_turn_conversation,
example_concurrent_responses,
example_with_interrupt,
example_manual_message_handling,
example_with_options,
example_error_handling,
]
"""Run all examples or a specific example based on command line argument."""
examples = {
"basic_streaming": example_basic_streaming,
"multi_turn_conversation": example_multi_turn_conversation,
"concurrent_responses": example_concurrent_responses,
"with_interrupt": example_with_interrupt,
"manual_message_handling": example_manual_message_handling,
"with_options": example_with_options,
"async_iterable_prompt": example_async_iterable_prompt,
"error_handling": example_error_handling,
}
for example in examples:
await example()
print("-" * 50 + "\n")
if len(sys.argv) < 2:
# List available examples
print("Usage: python streaming_mode.py <example_name>")
print("\nAvailable examples:")
print(" all - Run all examples")
for name in examples:
print(f" {name}")
sys.exit(0)
example_name = sys.argv[1]
if example_name == "all":
# Run all examples
for example in examples.values():
await example()
print("-" * 50 + "\n")
elif example_name in examples:
# Run specific example
await examples[example_name]()
else:
print(f"Error: Unknown example '{example_name}'")
print("\nAvailable examples:")
print(" all - Run all examples")
for name in examples:
print(f" {name}")
sys.exit(1)
if __name__ == "__main__":

View file

@ -4,6 +4,10 @@ IPython-friendly code snippets for ClaudeSDKClient streaming mode.
These examples are designed to be copy-pasted directly into IPython.
Each example is self-contained and can be run independently.
The queries are intentionally simplistic. In reality, a query can be a more
complex task that Claude SDK uses its agentic capabilities and tools (e.g. run
bash commands, edit files, search the web, fetch web content) to accomplish.
"""
# ============================================================================
@ -13,15 +17,14 @@ Each example is self-contained and can be run independently.
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage
async with ClaudeSDKClient() as client:
await client.send_message("What is 2+2?")
print("User: What is 2+2?")
await client.query("What is 2+2?")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage) and msg.total_cost_usd:
print(f"Cost: ${msg.total_cost_usd:.4f}")
# ============================================================================
@ -33,7 +36,8 @@ from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
async with ClaudeSDKClient() as client:
async def send_and_receive(prompt):
await client.send_message(prompt)
print(f"User: {prompt}")
await client.query(prompt)
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
@ -66,10 +70,12 @@ async def get_response():
# Use it multiple times
await client.send_message("What's 2+2?")
print("User: What's 2+2?")
await client.query("What's 2+2?")
await get_response()
await client.send_message("What's 10*10?")
print("User: What's 10*10?")
await client.query("What's 10*10?")
await get_response()
# Don't forget to disconnect when done
@ -89,7 +95,8 @@ async with ClaudeSDKClient() as client:
print("\n--- Sending initial message ---\n")
# Send a long-running task
await client.send_message("Count from 1 to 100 slowly using bash sleep")
print("User: Count from 1 to 100, run bash sleep for 1 second in between")
await client.query("Count from 1 to 100, run bash sleep for 1 second in between")
# Create a background task to consume messages
messages_received = []
@ -121,7 +128,7 @@ async with ClaudeSDKClient() as client:
# Send a new message after interrupt
print("\n--- After interrupt, sending new message ---\n")
await client.send_message("Just say 'Hello! I was interrupted.'")
await client.query("Just say 'Hello! I was interrupted.'")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
@ -138,7 +145,8 @@ from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
try:
async with ClaudeSDKClient() as client:
await client.send_message("Run a bash sleep command for 60 seconds")
print("User: Run a bash sleep command for 60 seconds")
await client.query("Run a bash sleep command for 60 seconds")
# Timeout after 20 seconds
messages = []
@ -156,6 +164,47 @@ except Exception as e:
print(f"Error: {e}")
# ============================================================================
# SENDING ASYNC ITERABLE OF MESSAGES
# ============================================================================
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
async def message_generator():
"""Generate multiple messages as an async iterable."""
print("User: I have two math questions.")
yield {
"type": "user",
"message": {"role": "user", "content": "I have two math questions."},
"parent_tool_use_id": None,
"session_id": "math-session"
}
print("User: What is 25 * 4?")
yield {
"type": "user",
"message": {"role": "user", "content": "What is 25 * 4?"},
"parent_tool_use_id": None,
"session_id": "math-session"
}
print("User: What is 100 / 5?")
yield {
"type": "user",
"message": {"role": "user", "content": "What is 100 / 5?"},
"parent_tool_use_id": None,
"session_id": "math-session"
}
async with ClaudeSDKClient() as client:
# Send async iterable instead of string
await client.query(message_generator())
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# ============================================================================
# COLLECTING ALL MESSAGES INTO A LIST
# ============================================================================
@ -163,7 +212,8 @@ except Exception as e:
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage
async with ClaudeSDKClient() as client:
await client.send_message("What are the primary colors?")
print("User: What are the primary colors?")
await client.query("What are the primary colors?")
# Collect all messages into a list
messages = [msg async for msg in client.receive_response()]
@ -176,5 +226,3 @@ async with ClaudeSDKClient() as client:
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage):
print(f"Total messages: {len(messages)}")
if msg.total_cost_usd:
print(f"Cost: ${msg.total_cost_usd:.4f}")

View file

@ -42,7 +42,7 @@ class ClaudeSDKClient:
# Automatically connects with empty stream for interactive use
async with ClaudeSDKClient() as client:
# Send initial message
await client.send_message("Let's solve a math problem step by step")
await client.query("Let's solve a math problem step by step")
# Receive and process response
async for message in client.receive_messages():
@ -50,7 +50,7 @@ class ClaudeSDKClient:
break
# Send follow-up based on response
await client.send_message("What's 15% of 80?")
await client.query("What's 15% of 80?")
# Continue conversation...
# Automatically disconnects
@ -60,14 +60,14 @@ class ClaudeSDKClient:
```python
async with ClaudeSDKClient() as client:
# Start a long task
await client.send_message("Count to 1000")
await client.query("Count to 1000")
# Interrupt after 2 seconds
await asyncio.sleep(2)
await client.interrupt()
# Send new instruction
await client.send_message("Never mind, what's 2+2?")
await client.query("Never mind, what's 2+2?")
```
Example - Manual connection:
@ -81,7 +81,7 @@ class ClaudeSDKClient:
await client.connect(message_stream())
# Send additional messages dynamically
await client.send_message("What's the weather?")
await client.query("What's the weather?")
async for message in client.receive_messages():
print(message)
@ -128,9 +128,9 @@ class ClaudeSDKClient:
if message:
yield message
async def send_message(self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default") -> None:
async def query(self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default") -> None:
"""
Send a new message in streaming mode.
Send a new request in streaming mode.
Args:
prompt: Either a string message or an async iterable of message dictionaries
@ -186,7 +186,7 @@ class ClaudeSDKClient:
Example:
```python
async with ClaudeSDKClient() as client:
await client.send_message("What's the capital of France?")
await client.query("What's the capital of France?")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):

View file

@ -116,8 +116,8 @@ class TestClaudeSDKClientStreaming:
anyio.run(_test)
def test_send_message(self):
"""Test sending a message."""
def test_query(self):
"""Test sending a query."""
async def _test():
with patch(
@ -127,7 +127,7 @@ class TestClaudeSDKClientStreaming:
mock_transport_class.return_value = mock_transport
async with ClaudeSDKClient() as client:
await client.send_message("Test message")
await client.query("Test message")
# Verify send_request was called with correct format
mock_transport.send_request.assert_called_once()
@ -151,7 +151,7 @@ class TestClaudeSDKClientStreaming:
mock_transport_class.return_value = mock_transport
async with ClaudeSDKClient() as client:
await client.send_message("Test", session_id="custom-session")
await client.query("Test", session_id="custom-session")
call_args = mock_transport.send_request.call_args
messages, options = call_args[0]
@ -166,7 +166,7 @@ class TestClaudeSDKClientStreaming:
async def _test():
client = ClaudeSDKClient()
with pytest.raises(CLIConnectionError, match="Not connected"):
await client.send_message("Test")
await client.query("Test")
anyio.run(_test)
@ -360,7 +360,7 @@ class TestClaudeSDKClientStreaming:
receive_task = asyncio.create_task(get_next_message())
# Send message while receiving
await client.send_message("Question 1")
await client.query("Question 1")
# Wait for first message
first_msg = await receive_task