claude-code-sdk-python/tests/test_subprocess_buffering.py
Dickson Tsai 22fa9f473e
Some checks failed
Lint / lint (push) Has been cancelled
Test / test (3.10) (push) Has been cancelled
Test / test (3.11) (push) Has been cancelled
Test / test (3.12) (push) Has been cancelled
Test / test (3.13) (push) Has been cancelled
Implement control protocol support for Python SDK (#139)
## Summary

This PR implements control protocol support in the Python SDK, aligning
it with the TypeScript implementation pattern. The refactor introduces a
Query + Transport separation to enable bidirectional communication
between the SDK and CLI.

## Motivation

The previous Python SDK implementation used a high-level abstraction in
the Transport ABC (`send_request`/`receive_messages`) that couldn't
handle bidirectional communication. This prevented support for:
- Control messages from CLI to SDK that need responses
- Hooks implementation  
- Dynamic permission mode changes
- SDK MCP servers

## Changes

### Core Architecture Refactor

1. **New Query Class** (`src/claude_code_sdk/_internal/query.py`)
   - Manages control protocol on top of Transport
   - Handles control request/response routing
   - Manages initialization handshake with timeout
   - Supports hook callbacks and tool permission callbacks
   - Implements message streaming

2. **Refactored Transport ABC**
(`src/claude_code_sdk/_internal/transport/__init__.py`)
- Changed from high-level (`send_request`/`receive_messages`) to
low-level (`write`/`read_messages`) interface
   - Now handles raw I/O instead of protocol logic
   - Aligns with TypeScript ProcessTransport pattern

3. **Updated SubprocessCLITransport**
(`src/claude_code_sdk/_internal/transport/subprocess_cli.py`)
   - Simplified to focus on raw message streaming
   - Removed protocol logic (moved to Query)
   - Improved cleanup and error handling

4. **Enhanced ClaudeSDKClient** (`src/claude_code_sdk/client.py`)
   - Now uses Query for control protocol
   - Supports initialization messages
   - Better error handling for control protocol failures

### Control Protocol Features

- **Initialization handshake**: SDK sends initialize request, CLI
responds with supported commands
- **Control message types**: 
  - `initialize`: Establish bidirectional connection
  - `interrupt`: Cancel ongoing operations  
  - `set_permission_mode`: Change permission mode dynamically
- **Timeout handling**: 60-second timeout for initialization to handle
CLI versions without control support

### Examples

Updated `examples/streaming_mode.py` to demonstrate control protocol
initialization and error handling.

## Testing

- Tested with current CLI (no control protocol support yet) - gracefully
falls back
- Verified backward compatibility with existing `query()` function
- Tested initialization timeout handling
- Verified proper cleanup on errors

## Design Alignment

This implementation closely follows the TypeScript reference:
- `src/core/Query.ts` → `src/claude_code_sdk/_internal/query.py`
- `src/transport/ProcessTransport.ts` →
`src/claude_code_sdk/_internal/transport/subprocess_cli.py`
- `src/entrypoints/sdk.ts` → `src/claude_code_sdk/client.py`

## Next Steps

Once the CLI implements the control protocol handler, this will enable:
- Hooks support
- Dynamic permission mode changes
- SDK MCP servers
- Improved error recovery

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Ashwin Bhat <ashwin@anthropic.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Kashyap Murali <kashyap@anthropic.com>
2025-09-01 23:04:22 -07:00

307 lines
11 KiB
Python

"""Tests for subprocess transport buffering edge cases."""
import json
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import anyio
import pytest
from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import (
_MAX_BUFFER_SIZE,
SubprocessCLITransport,
)
from claude_code_sdk.types import ClaudeCodeOptions
class MockTextReceiveStream:
"""Mock TextReceiveStream for testing."""
def __init__(self, lines: list[str]) -> None:
self.lines = lines
self.index = 0
def __aiter__(self) -> AsyncIterator[str]:
return self
async def __anext__(self) -> str:
if self.index >= len(self.lines):
raise StopAsyncIteration
line = self.lines[self.index]
self.index += 1
return line
class TestSubprocessBuffering:
"""Test subprocess transport handling of buffered output."""
def test_multiple_json_objects_on_single_line(self) -> None:
"""Test parsing when multiple JSON objects are concatenated on a single line.
In some environments, stdout buffering can cause multiple distinct JSON
objects to be delivered as a single line with embedded newlines.
"""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed"
anyio.run(_test)
def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
assert messages[1]["data"] == "Some\nMultiline\nContent"
anyio.run(_test)
def test_multiple_newlines_between_objects(self) -> None:
"""Test parsing with multiple newlines between JSON objects."""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"
anyio.run(_test)
def test_split_json_across_multiple_reads(self) -> None:
"""Test parsing when a single JSON object is split across multiple stream reads."""
async def _test() -> None:
json_obj = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "x" * 1000},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
]
},
}
complete_json = json.dumps(json_obj)
part1 = complete_json[:100]
part2 = complete_json[100:250]
part3 = complete_json[250:]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2
anyio.run(_test)
def test_large_minified_json(self) -> None:
"""Test parsing a large minified JSON (simulating the reported issue)."""
async def _test() -> None:
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
json_obj = {
"type": "user",
"message": {
"role": "user",
"content": [
{
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
"type": "tool_result",
"content": json.dumps(large_data),
}
],
},
}
complete_json = json.dumps(json_obj)
chunk_size = 64 * 1024
chunks = [
complete_json[i : i + chunk_size]
for i in range(0, len(complete_json), chunk_size)
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(chunks)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "user"
assert (
messages[0]["message"]["content"][0]["tool_use_id"]
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
)
anyio.run(_test)
def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""
async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert isinstance(exc_info.value, CLIJSONDecodeError)
assert "exceeded maximum buffer size" in str(exc_info.value)
anyio.run(_test)
def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""
async def _test() -> None:
msg1 = json.dumps({"type": "system", "subtype": "start"})
large_msg = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
}
large_json = json.dumps(large_msg)
msg3 = json.dumps({"type": "system", "subtype": "end"})
lines = [
msg1 + "\n",
large_json[:1000],
large_json[1000:3000],
large_json[3000:] + "\n" + msg3,
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"
anyio.run(_test)