claude-code-sdk-python/tests/test_client.py
Dickson Tsai 22fa9f473e
Some checks failed
Lint / lint (push) Has been cancelled
Test / test (3.10) (push) Has been cancelled
Test / test (3.11) (push) Has been cancelled
Test / test (3.12) (push) Has been cancelled
Test / test (3.13) (push) Has been cancelled
Implement control protocol support for Python SDK (#139)
## Summary

This PR implements control protocol support in the Python SDK, aligning
it with the TypeScript implementation pattern. The refactor introduces a
Query + Transport separation to enable bidirectional communication
between the SDK and CLI.

## Motivation

The previous Python SDK implementation used a high-level abstraction in
the Transport ABC (`send_request`/`receive_messages`) that couldn't
handle bidirectional communication. This prevented support for:
- Control messages from CLI to SDK that need responses
- Hooks implementation  
- Dynamic permission mode changes
- SDK MCP servers

## Changes

### Core Architecture Refactor

1. **New Query Class** (`src/claude_code_sdk/_internal/query.py`)
   - Manages control protocol on top of Transport
   - Handles control request/response routing
   - Manages initialization handshake with timeout
   - Supports hook callbacks and tool permission callbacks
   - Implements message streaming

2. **Refactored Transport ABC**
(`src/claude_code_sdk/_internal/transport/__init__.py`)
- Changed from high-level (`send_request`/`receive_messages`) to
low-level (`write`/`read_messages`) interface
   - Now handles raw I/O instead of protocol logic
   - Aligns with TypeScript ProcessTransport pattern

3. **Updated SubprocessCLITransport**
(`src/claude_code_sdk/_internal/transport/subprocess_cli.py`)
   - Simplified to focus on raw message streaming
   - Removed protocol logic (moved to Query)
   - Improved cleanup and error handling

4. **Enhanced ClaudeSDKClient** (`src/claude_code_sdk/client.py`)
   - Now uses Query for control protocol
   - Supports initialization messages
   - Better error handling for control protocol failures

### Control Protocol Features

- **Initialization handshake**: SDK sends initialize request, CLI
responds with supported commands
- **Control message types**: 
  - `initialize`: Establish bidirectional connection
  - `interrupt`: Cancel ongoing operations  
  - `set_permission_mode`: Change permission mode dynamically
- **Timeout handling**: 60-second timeout for initialization to handle
CLI versions without control support

### Examples

Updated `examples/streaming_mode.py` to demonstrate control protocol
initialization and error handling.

## Testing

- Tested with current CLI (no control protocol support yet) - gracefully
falls back
- Verified backward compatibility with existing `query()` function
- Tested initialization timeout handling
- Verified proper cleanup on errors

## Design Alignment

This implementation closely follows the TypeScript reference:
- `src/core/Query.ts` → `src/claude_code_sdk/_internal/query.py`
- `src/transport/ProcessTransport.ts` →
`src/claude_code_sdk/_internal/transport/subprocess_cli.py`
- `src/entrypoints/sdk.ts` → `src/claude_code_sdk/client.py`

## Next Steps

Once the CLI implements the control protocol handler, this will enable:
- Hooks support
- Dynamic permission mode changes
- SDK MCP servers
- Improved error recovery

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Ashwin Bhat <ashwin@anthropic.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Kashyap Murali <kashyap@anthropic.com>
2025-09-01 23:04:22 -07:00

123 lines
4.4 KiB
Python

"""Tests for Claude SDK client functionality."""
from unittest.mock import AsyncMock, Mock, patch
import anyio
from claude_code_sdk import AssistantMessage, ClaudeCodeOptions, query
from claude_code_sdk.types import TextBlock
class TestQueryFunction:
"""Test the main query function."""
def test_query_single_prompt(self):
"""Test query with a single prompt."""
async def _test():
with patch(
"claude_code_sdk._internal.client.InternalClient.process_query"
) as mock_process:
# Mock the async generator
async def mock_generator():
yield AssistantMessage(
content=[TextBlock(text="4")], model="claude-opus-4-1-20250805"
)
mock_process.return_value = mock_generator()
messages = []
async for msg in query(prompt="What is 2+2?"):
messages.append(msg)
assert len(messages) == 1
assert isinstance(messages[0], AssistantMessage)
assert messages[0].content[0].text == "4"
anyio.run(_test)
def test_query_with_options(self):
"""Test query with various options."""
async def _test():
with patch(
"claude_code_sdk._internal.client.InternalClient.process_query"
) as mock_process:
async def mock_generator():
yield AssistantMessage(
content=[TextBlock(text="Hello!")],
model="claude-opus-4-1-20250805",
)
mock_process.return_value = mock_generator()
options = ClaudeCodeOptions(
allowed_tools=["Read", "Write"],
system_prompt="You are helpful",
permission_mode="acceptEdits",
max_turns=5,
)
messages = []
async for msg in query(prompt="Hi", options=options):
messages.append(msg)
# Verify process_query was called with correct prompt and options
mock_process.assert_called_once()
call_args = mock_process.call_args
assert call_args[1]["prompt"] == "Hi"
assert call_args[1]["options"] == options
anyio.run(_test)
def test_query_with_cwd(self):
"""Test query with custom working directory."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Done"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test-session",
"total_cost_usd": 0.001,
}
mock_transport.read_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.close = AsyncMock()
mock_transport.end_input = AsyncMock()
mock_transport.write = AsyncMock()
mock_transport.is_ready = Mock(return_value=True)
options = ClaudeCodeOptions(cwd="/custom/path")
messages = []
async for msg in query(prompt="test", options=options):
messages.append(msg)
# Verify transport was created with correct parameters
mock_transport_class.assert_called_once()
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["prompt"] == "test"
assert call_kwargs["options"].cwd == "/custom/path"
anyio.run(_test)