claude-code-sdk-python/tests/test_integration.py
Dickson Tsai 22fa9f473e
Some checks failed
Lint / lint (push) Has been cancelled
Test / test (3.10) (push) Has been cancelled
Test / test (3.11) (push) Has been cancelled
Test / test (3.12) (push) Has been cancelled
Test / test (3.13) (push) Has been cancelled
Implement control protocol support for Python SDK (#139)
## Summary

This PR implements control protocol support in the Python SDK, aligning
it with the TypeScript implementation pattern. The refactor introduces a
Query + Transport separation to enable bidirectional communication
between the SDK and CLI.

## Motivation

The previous Python SDK implementation used a high-level abstraction in
the Transport ABC (`send_request`/`receive_messages`) that couldn't
handle bidirectional communication. This prevented support for:
- Control messages from CLI to SDK that need responses
- Hooks implementation  
- Dynamic permission mode changes
- SDK MCP servers

## Changes

### Core Architecture Refactor

1. **New Query Class** (`src/claude_code_sdk/_internal/query.py`)
   - Manages control protocol on top of Transport
   - Handles control request/response routing
   - Manages initialization handshake with timeout
   - Supports hook callbacks and tool permission callbacks
   - Implements message streaming

2. **Refactored Transport ABC**
(`src/claude_code_sdk/_internal/transport/__init__.py`)
- Changed from high-level (`send_request`/`receive_messages`) to
low-level (`write`/`read_messages`) interface
   - Now handles raw I/O instead of protocol logic
   - Aligns with TypeScript ProcessTransport pattern

3. **Updated SubprocessCLITransport**
(`src/claude_code_sdk/_internal/transport/subprocess_cli.py`)
   - Simplified to focus on raw message streaming
   - Removed protocol logic (moved to Query)
   - Improved cleanup and error handling

4. **Enhanced ClaudeSDKClient** (`src/claude_code_sdk/client.py`)
   - Now uses Query for control protocol
   - Supports initialization messages
   - Better error handling for control protocol failures

### Control Protocol Features

- **Initialization handshake**: SDK sends initialize request, CLI
responds with supported commands
- **Control message types**: 
  - `initialize`: Establish bidirectional connection
  - `interrupt`: Cancel ongoing operations  
  - `set_permission_mode`: Change permission mode dynamically
- **Timeout handling**: 60-second timeout for initialization to handle
CLI versions without control support

### Examples

Updated `examples/streaming_mode.py` to demonstrate control protocol
initialization and error handling.

## Testing

- Tested with current CLI (no control protocol support yet) - gracefully
falls back
- Verified backward compatibility with existing `query()` function
- Tested initialization timeout handling
- Verified proper cleanup on errors

## Design Alignment

This implementation closely follows the TypeScript reference:
- `src/core/Query.ts` → `src/claude_code_sdk/_internal/query.py`
- `src/transport/ProcessTransport.ts` →
`src/claude_code_sdk/_internal/transport/subprocess_cli.py`
- `src/entrypoints/sdk.ts` → `src/claude_code_sdk/client.py`

## Next Steps

Once the CLI implements the control protocol handler, this will enable:
- Hooks support
- Dynamic permission mode changes
- SDK MCP servers
- Improved error recovery

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Ashwin Bhat <ashwin@anthropic.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Kashyap Murali <kashyap@anthropic.com>
2025-09-01 23:04:22 -07:00

214 lines
7.9 KiB
Python

"""Integration tests for Claude SDK.
These tests verify end-to-end functionality with mocked CLI responses.
"""
from unittest.mock import AsyncMock, Mock, patch
import anyio
import pytest
from claude_code_sdk import (
AssistantMessage,
ClaudeCodeOptions,
CLINotFoundError,
ResultMessage,
query,
)
from claude_code_sdk.types import ToolUseBlock
class TestIntegration:
"""End-to-end integration tests."""
def test_simple_query_response(self):
"""Test a simple query with text response."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "2 + 2 equals 4"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test-session",
"total_cost_usd": 0.001,
}
mock_transport.read_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.close = AsyncMock()
mock_transport.end_input = AsyncMock()
mock_transport.write = AsyncMock()
mock_transport.is_ready = Mock(return_value=True)
# Run query
messages = []
async for msg in query(prompt="What is 2 + 2?"):
messages.append(msg)
# Verify results
assert len(messages) == 2
# Check assistant message
assert isinstance(messages[0], AssistantMessage)
assert len(messages[0].content) == 1
assert messages[0].content[0].text == "2 + 2 equals 4"
# Check result message
assert isinstance(messages[1], ResultMessage)
assert messages[1].total_cost_usd == 0.001
assert messages[1].session_id == "test-session"
anyio.run(_test)
def test_query_with_tool_use(self):
"""Test query that uses tools."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream with tool use
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{
"type": "text",
"text": "Let me read that file for you.",
},
{
"type": "tool_use",
"id": "tool-123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1500,
"duration_api_ms": 1200,
"is_error": False,
"num_turns": 1,
"session_id": "test-session-2",
"total_cost_usd": 0.002,
}
mock_transport.read_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.close = AsyncMock()
mock_transport.end_input = AsyncMock()
mock_transport.write = AsyncMock()
mock_transport.is_ready = Mock(return_value=True)
# Run query with tools enabled
messages = []
async for msg in query(
prompt="Read /test.txt",
options=ClaudeCodeOptions(allowed_tools=["Read"]),
):
messages.append(msg)
# Verify results
assert len(messages) == 2
# Check assistant message with tool use
assert isinstance(messages[0], AssistantMessage)
assert len(messages[0].content) == 2
assert messages[0].content[0].text == "Let me read that file for you."
assert isinstance(messages[0].content[1], ToolUseBlock)
assert messages[0].content[1].name == "Read"
assert messages[0].content[1].input["file_path"] == "/test.txt"
anyio.run(_test)
def test_cli_not_found(self):
"""Test handling when CLI is not found."""
async def _test():
with (
patch("shutil.which", return_value=None),
patch("pathlib.Path.exists", return_value=False),
pytest.raises(CLINotFoundError) as exc_info,
):
async for _ in query(prompt="test"):
pass
assert "Claude Code requires Node.js" in str(exc_info.value)
anyio.run(_test)
def test_continuation_option(self):
"""Test query with continue_conversation option."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{
"type": "text",
"text": "Continuing from previous conversation",
}
],
"model": "claude-opus-4-1-20250805",
},
}
mock_transport.read_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.close = AsyncMock()
mock_transport.end_input = AsyncMock()
mock_transport.write = AsyncMock()
mock_transport.is_ready = Mock(return_value=True)
# Run query with continuation
messages = []
async for msg in query(
prompt="Continue",
options=ClaudeCodeOptions(continue_conversation=True),
):
messages.append(msg)
# Verify transport was created with continuation option
mock_transport_class.assert_called_once()
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["options"].continue_conversation is True
anyio.run(_test)