mirror of
https://github.com/anthropics/claude-code-sdk-python.git
synced 2025-07-07 14:45:00 +00:00

Removed redundant comments that simply restate what the code is doing. Kept only comments that provide valuable context or explain complex behavior. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
308 lines
11 KiB
Python
308 lines
11 KiB
Python
"""Tests for subprocess transport buffering edge cases."""
|
|
|
|
import json
|
|
from collections.abc import AsyncIterator
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import anyio
|
|
import pytest
|
|
|
|
from claude_code_sdk._errors import CLIJSONDecodeError
|
|
from claude_code_sdk._internal.transport.subprocess_cli import (
|
|
_MAX_BUFFER_SIZE,
|
|
SubprocessCLITransport,
|
|
)
|
|
from claude_code_sdk.types import ClaudeCodeOptions
|
|
|
|
|
|
class MockTextReceiveStream:
|
|
"""Mock TextReceiveStream for testing."""
|
|
|
|
def __init__(self, lines: list[str]) -> None:
|
|
self.lines = lines
|
|
self.index = 0
|
|
|
|
def __aiter__(self) -> AsyncIterator[str]:
|
|
return self
|
|
|
|
async def __anext__(self) -> str:
|
|
if self.index >= len(self.lines):
|
|
raise StopAsyncIteration
|
|
line = self.lines[self.index]
|
|
self.index += 1
|
|
return line
|
|
|
|
|
|
class TestSubprocessBuffering:
|
|
"""Test subprocess transport handling of buffered output."""
|
|
|
|
def test_multiple_json_objects_on_single_line(self) -> None:
|
|
"""Test parsing when multiple JSON objects are concatenated on a single line.
|
|
|
|
In some environments, stdout buffering can cause multiple distinct JSON
|
|
objects to be delivered as a single line with embedded newlines.
|
|
"""
|
|
|
|
async def _test() -> None:
|
|
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
|
|
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
|
|
|
|
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
|
|
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
|
|
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
|
|
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(messages) == 2
|
|
assert messages[0]["type"] == "message"
|
|
assert messages[0]["id"] == "msg1"
|
|
assert messages[0]["content"] == "First message"
|
|
assert messages[1]["type"] == "result"
|
|
assert messages[1]["id"] == "res1"
|
|
assert messages[1]["status"] == "completed"
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_json_with_embedded_newlines(self) -> None:
|
|
"""Test parsing JSON objects that contain newline characters in string values."""
|
|
|
|
async def _test() -> None:
|
|
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
|
|
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
|
|
|
|
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
transport._stdout_stream = MockTextReceiveStream([buffered_line])
|
|
transport._stderr_stream = MockTextReceiveStream([])
|
|
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(messages) == 2
|
|
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
|
|
assert messages[1]["data"] == "Some\nMultiline\nContent"
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_multiple_newlines_between_objects(self) -> None:
|
|
"""Test parsing with multiple newlines between JSON objects."""
|
|
|
|
async def _test() -> None:
|
|
json_obj1 = {"type": "message", "id": "msg1"}
|
|
json_obj2 = {"type": "result", "id": "res1"}
|
|
|
|
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
transport._stdout_stream = MockTextReceiveStream([buffered_line])
|
|
transport._stderr_stream = MockTextReceiveStream([])
|
|
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(messages) == 2
|
|
assert messages[0]["id"] == "msg1"
|
|
assert messages[1]["id"] == "res1"
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_split_json_across_multiple_reads(self) -> None:
|
|
"""Test parsing when a single JSON object is split across multiple stream reads."""
|
|
|
|
async def _test() -> None:
|
|
json_obj = {
|
|
"type": "assistant",
|
|
"message": {
|
|
"content": [
|
|
{"type": "text", "text": "x" * 1000},
|
|
{
|
|
"type": "tool_use",
|
|
"id": "tool_123",
|
|
"name": "Read",
|
|
"input": {"file_path": "/test.txt"},
|
|
},
|
|
]
|
|
},
|
|
}
|
|
|
|
complete_json = json.dumps(json_obj)
|
|
|
|
part1 = complete_json[:100]
|
|
part2 = complete_json[100:250]
|
|
part3 = complete_json[250:]
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
|
|
transport._stderr_stream = MockTextReceiveStream([])
|
|
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(messages) == 1
|
|
assert messages[0]["type"] == "assistant"
|
|
assert len(messages[0]["message"]["content"]) == 2
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_large_minified_json(self) -> None:
|
|
"""Test parsing a large minified JSON (simulating the reported issue)."""
|
|
|
|
async def _test() -> None:
|
|
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
|
|
json_obj = {
|
|
"type": "user",
|
|
"message": {
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
|
|
"type": "tool_result",
|
|
"content": json.dumps(large_data),
|
|
}
|
|
],
|
|
},
|
|
}
|
|
|
|
complete_json = json.dumps(json_obj)
|
|
|
|
chunk_size = 64 * 1024
|
|
chunks = [
|
|
complete_json[i : i + chunk_size]
|
|
for i in range(0, len(complete_json), chunk_size)
|
|
]
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
transport._stdout_stream = MockTextReceiveStream(chunks)
|
|
transport._stderr_stream = MockTextReceiveStream([])
|
|
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(messages) == 1
|
|
assert messages[0]["type"] == "user"
|
|
assert (
|
|
messages[0]["message"]["content"][0]["tool_use_id"]
|
|
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
|
|
)
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_buffer_size_exceeded(self) -> None:
|
|
"""Test that exceeding buffer size raises an appropriate error."""
|
|
|
|
async def _test() -> None:
|
|
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
|
|
transport._stderr_stream = MockTextReceiveStream([])
|
|
|
|
with pytest.raises(Exception) as exc_info:
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(exc_info.value.exceptions) == 1
|
|
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)
|
|
assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0])
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_mixed_complete_and_split_json(self) -> None:
|
|
"""Test handling a mix of complete and split JSON messages."""
|
|
|
|
async def _test() -> None:
|
|
msg1 = json.dumps({"type": "system", "subtype": "start"})
|
|
|
|
large_msg = {
|
|
"type": "assistant",
|
|
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
|
|
}
|
|
large_json = json.dumps(large_msg)
|
|
|
|
msg3 = json.dumps({"type": "system", "subtype": "end"})
|
|
|
|
lines = [
|
|
msg1 + "\n",
|
|
large_json[:1000],
|
|
large_json[1000:3000],
|
|
large_json[3000:] + "\n" + msg3,
|
|
]
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.wait = AsyncMock(return_value=None)
|
|
transport._process = mock_process
|
|
transport._stdout_stream = MockTextReceiveStream(lines)
|
|
transport._stderr_stream = MockTextReceiveStream([])
|
|
|
|
messages: list[Any] = []
|
|
async for msg in transport.receive_messages():
|
|
messages.append(msg)
|
|
|
|
assert len(messages) == 3
|
|
assert messages[0]["type"] == "system"
|
|
assert messages[0]["subtype"] == "start"
|
|
assert messages[1]["type"] == "assistant"
|
|
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
|
|
assert messages[2]["type"] == "system"
|
|
assert messages[2]["subtype"] == "end"
|
|
|
|
anyio.run(_test)
|