claude-code-sdk-python/tests/test_subprocess_buffering.py
Chase Naples aebcf9d6a4
Some checks failed
Lint / lint (push) Has been cancelled
Test / test (macos-latest, 3.10) (push) Has been cancelled
Test / test (macos-latest, 3.11) (push) Has been cancelled
Test / test (macos-latest, 3.12) (push) Has been cancelled
Test / test (macos-latest, 3.13) (push) Has been cancelled
Test / test (ubuntu-latest, 3.10) (push) Has been cancelled
Test / test (ubuntu-latest, 3.11) (push) Has been cancelled
Test / test (ubuntu-latest, 3.12) (push) Has been cancelled
Test / test (ubuntu-latest, 3.13) (push) Has been cancelled
Test / test (windows-latest, 3.10) (push) Has been cancelled
Test / test (windows-latest, 3.11) (push) Has been cancelled
Test / test (windows-latest, 3.12) (push) Has been cancelled
Test / test (windows-latest, 3.13) (push) Has been cancelled
Test / test-examples (3.10) (push) Has been cancelled
Test / test-e2e (macos-latest, 3.10) (push) Has been cancelled
Test / test-e2e (macos-latest, 3.11) (push) Has been cancelled
Test / test-e2e (macos-latest, 3.12) (push) Has been cancelled
Test / test-e2e (macos-latest, 3.13) (push) Has been cancelled
Test / test-e2e (ubuntu-latest, 3.10) (push) Has been cancelled
Test / test-e2e (ubuntu-latest, 3.11) (push) Has been cancelled
Test / test-e2e (ubuntu-latest, 3.12) (push) Has been cancelled
Test / test-e2e (ubuntu-latest, 3.13) (push) Has been cancelled
Test / test-e2e (windows-latest, 3.10) (push) Has been cancelled
Test / test-e2e (windows-latest, 3.12) (push) Has been cancelled
Test / test-e2e (windows-latest, 3.13) (push) Has been cancelled
Test / test-examples (3.11) (push) Has been cancelled
Test / test-examples (3.12) (push) Has been cancelled
Test / test-examples (3.13) (push) Has been cancelled
Test / test-e2e (windows-latest, 3.11) (push) Has been cancelled
feat: add cli_path support to ClaudeAgentOptions (#235)
## Summary
Adds support for passing custom Claude Code CLI paths through
`ClaudeAgentOptions`, allowing organizations with non-standard
installation locations to specify the CLI path explicitly.

## Motivation
As noted in #214, organizations may install Claude Code CLI (or wrapped
versions) at custom locations and prefer to provide those paths instead
of relying on the SDK's default search logic. The transport layer
already supported `cli_path`, but it was never exposed through the
public API.

## Changes
1. **types.py**: Added `cli_path: str | Path | None = None` parameter to
`ClaudeAgentOptions` dataclass
2. **_internal/client.py**: Pass `cli_path` from
`configured_options.cli_path` to `SubprocessCLITransport`
3. **client.py**: Pass `cli_path` from `options.cli_path` to
`SubprocessCLITransport`

## Implementation Details
The `SubprocessCLITransport` constructor already accepted a `cli_path`
parameter (line 40 of subprocess_cli.py), but it was never passed from
the client layers. This PR completes the wiring by:
- Adding the option to the public `ClaudeAgentOptions` interface
- Extracting and passing it through both client implementations
(`InternalClient.process_query` and `ClaudeSDKClient.connect`)

## Usage Example
```python
from claude_agent_sdk import query, ClaudeAgentOptions

# Specify custom CLI path
options = ClaudeAgentOptions(
    cli_path="/custom/path/to/claude"
)

result = await query("Hello!", options=options)
```

## Testing
- No new tests added as this is a straightforward parameter pass-through
- Existing tests should continue to work (default behavior unchanged)
- CI will validate the changes don't break existing functionality

Fixes #214

---------

Co-authored-by: Ashwin Bhat <ashwin@anthropic.com>
2025-10-12 23:19:53 -07:00

329 lines
12 KiB
Python

"""Tests for subprocess transport buffering edge cases."""
import json
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import anyio
import pytest
from claude_agent_sdk._errors import CLIJSONDecodeError
from claude_agent_sdk._internal.transport.subprocess_cli import (
_DEFAULT_MAX_BUFFER_SIZE,
SubprocessCLITransport,
)
from claude_agent_sdk.types import ClaudeAgentOptions
DEFAULT_CLI_PATH = "/usr/bin/claude"
def make_options(**kwargs: object) -> ClaudeAgentOptions:
"""Construct ClaudeAgentOptions with a default CLI path for tests."""
cli_path = kwargs.pop("cli_path", DEFAULT_CLI_PATH)
return ClaudeAgentOptions(cli_path=cli_path, **kwargs)
class MockTextReceiveStream:
"""Mock TextReceiveStream for testing."""
def __init__(self, lines: list[str]) -> None:
self.lines = lines
self.index = 0
def __aiter__(self) -> AsyncIterator[str]:
return self
async def __anext__(self) -> str:
if self.index >= len(self.lines):
raise StopAsyncIteration
line = self.lines[self.index]
self.index += 1
return line
class TestSubprocessBuffering:
"""Test subprocess transport handling of buffered output."""
def test_multiple_json_objects_on_single_line(self) -> None:
"""Test parsing when multiple JSON objects are concatenated on a single line.
In some environments, stdout buffering can cause multiple distinct JSON
objects to be delivered as a single line with embedded newlines.
"""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed"
anyio.run(_test)
def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
assert messages[1]["data"] == "Some\nMultiline\nContent"
anyio.run(_test)
def test_multiple_newlines_between_objects(self) -> None:
"""Test parsing with multiple newlines between JSON objects."""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"
anyio.run(_test)
def test_split_json_across_multiple_reads(self) -> None:
"""Test parsing when a single JSON object is split across multiple stream reads."""
async def _test() -> None:
json_obj = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "x" * 1000},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
]
},
}
complete_json = json.dumps(json_obj)
part1 = complete_json[:100]
part2 = complete_json[100:250]
part3 = complete_json[250:]
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2
anyio.run(_test)
def test_large_minified_json(self) -> None:
"""Test parsing a large minified JSON (simulating the reported issue)."""
async def _test() -> None:
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
json_obj = {
"type": "user",
"message": {
"role": "user",
"content": [
{
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
"type": "tool_result",
"content": json.dumps(large_data),
}
],
},
}
complete_json = json.dumps(json_obj)
chunk_size = 64 * 1024
chunks = [
complete_json[i : i + chunk_size]
for i in range(0, len(complete_json), chunk_size)
]
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(chunks)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "user"
assert (
messages[0]["message"]["content"][0]["tool_use_id"]
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
)
anyio.run(_test)
def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""
async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_DEFAULT_MAX_BUFFER_SIZE + 1000)
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert isinstance(exc_info.value, CLIJSONDecodeError)
assert "exceeded maximum buffer size" in str(exc_info.value)
anyio.run(_test)
def test_buffer_size_option(self) -> None:
"""Test that the configurable buffer size option is respected."""
async def _test() -> None:
custom_limit = 512
huge_incomplete = '{"data": "' + "x" * (custom_limit + 10)
transport = SubprocessCLITransport(
prompt="test",
options=make_options(max_buffer_size=custom_limit),
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])
with pytest.raises(CLIJSONDecodeError) as exc_info:
async for _ in transport.read_messages():
pass
assert f"maximum buffer size of {custom_limit} bytes" in str(exc_info.value)
anyio.run(_test)
def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""
async def _test() -> None:
msg1 = json.dumps({"type": "system", "subtype": "start"})
large_msg = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
}
large_json = json.dumps(large_msg)
msg3 = json.dumps({"type": "system", "subtype": "end"})
lines = [
msg1 + "\n",
large_json[:1000],
large_json[1000:3000],
large_json[3000:] + "\n" + msg3,
]
transport = SubprocessCLITransport(prompt="test", options=make_options())
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)
assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"
anyio.run(_test)