Bubble up initialization message

This commit is contained in:
Dickson Tsai 2025-08-29 19:31:47 -07:00
parent fb334b6aef
commit d40261c8d5
No known key found for this signature in database
3 changed files with 107 additions and 0 deletions

View file

@ -340,6 +340,85 @@ async def example_bash_command():
print("\n")
async def example_control_protocol():
"""Demonstrate server info and interrupt capabilities."""
print("=== Control Protocol Example ===")
print("Shows server info retrieval and interrupt capability\n")
async with ClaudeSDKClient() as client:
# 1. Get server initialization info
print("1. Getting server info...")
server_info = await client.get_server_info()
if server_info:
print("✓ Server info retrieved successfully!")
print(f" - Available commands: {len(server_info.get('commands', []))}")
print(f" - Output style: {server_info.get('output_style', 'unknown')}")
# Show available output styles if present
styles = server_info.get('available_output_styles', [])
if styles:
print(f" - Available output styles: {', '.join(styles)}")
# Show a few example commands
commands = server_info.get('commands', [])[:5]
if commands:
print(" - Example commands:")
for cmd in commands:
if isinstance(cmd, dict):
print(f"{cmd.get('name', 'unknown')}")
else:
print("✗ No server info available (may not be in streaming mode)")
print("\n2. Testing interrupt capability...")
# Start a long-running task
print("User: Count from 1 to 20 slowly")
await client.query("Count from 1 to 20 slowly, pausing between each number")
# Start consuming messages in background to enable interrupt
messages = []
async def consume():
async for msg in client.receive_response():
messages.append(msg)
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
# Print first 50 chars to show progress
print(f"Claude: {block.text[:50]}...")
break
if isinstance(msg, ResultMessage):
break
consume_task = asyncio.create_task(consume())
# Wait a moment then interrupt
await asyncio.sleep(2)
print("\n[Sending interrupt after 2 seconds...]")
try:
await client.interrupt()
print("✓ Interrupt sent successfully")
except Exception as e:
print(f"✗ Interrupt failed: {e}")
# Wait for task to complete
with contextlib.suppress(asyncio.CancelledError):
await consume_task
# Send new query after interrupt
print("\nUser: Just say 'Hello!'")
await client.query("Just say 'Hello!'")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
print("\n")
async def example_error_handling():
"""Demonstrate proper error handling."""
print("=== Error Handling Example ===")
@ -397,6 +476,7 @@ async def main():
"with_options": example_with_options,
"async_iterable_prompt": example_async_iterable_prompt,
"bash_command": example_bash_command,
"control_protocol": example_control_protocol,
"error_handling": example_error_handling,
}

View file

@ -54,6 +54,7 @@ class Query:
self._read_task: asyncio.Task[None] | None = None
self._initialized = False
self._closed = False
self._initialization_result: dict[str, Any] | None = None
async def initialize(self) -> dict[str, Any] | None:
"""Initialize control protocol if in streaming mode.
@ -90,6 +91,7 @@ class Query:
response = await self._send_control_request(request)
self._initialized = True
self._initialization_result = response # Store for later access
return response
async def start(self) -> None:

View file

@ -187,6 +187,31 @@ class ClaudeSDKClient:
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.interrupt()
async def get_server_info(self) -> dict[str, Any] | None:
"""Get server initialization info including available commands and output styles.
Returns initialization information from the Claude Code server including:
- Available commands (slash commands, system commands, etc.)
- Current and available output styles
- Server capabilities
Returns:
Dictionary with server info, or None if not in streaming mode
Example:
```python
async with ClaudeSDKClient() as client:
info = await client.get_server_info()
if info:
print(f"Commands available: {len(info.get('commands', []))}")
print(f"Output style: {info.get('output_style', 'default')}")
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
# Return the initialization result that was already obtained during connect
return getattr(self._query, '_initialization_result', None)
async def receive_response(self) -> AsyncIterator[Message]:
"""