diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index d403ced..288b2e7 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -17,6 +17,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ...types import ClaudeCodeOptions from . import Transport +_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit + class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI.""" @@ -182,13 +184,14 @@ class SubprocessCLITransport(Transport): async with anyio.create_task_group() as tg: tg.start_soon(read_stderr) + json_buffer = "" + try: async for line in self._stdout_stream: line_str = line.strip() if not line_str: continue - # Split on newlines in case multiple JSON objects are buffered together json_lines = line_str.split("\n") for json_line in json_lines: @@ -196,16 +199,26 @@ class SubprocessCLITransport(Transport): if not json_line: continue + # Keep accumulating partial JSON until we can parse it + json_buffer += json_line + + if len(json_buffer) > _MAX_BUFFER_SIZE: + json_buffer = "" + raise SDKJSONDecodeError( + f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", + ValueError( + f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}" + ), + ) + try: - data = json.loads(json_line) + data = json.loads(json_buffer) + json_buffer = "" try: yield data except GeneratorExit: - # Handle generator cleanup gracefully return - except json.JSONDecodeError as e: - if json_line.startswith("{") or json_line.startswith("["): - raise SDKJSONDecodeError(json_line, e) from e + except json.JSONDecodeError: continue except anyio.ClosedResourceError: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 06886df..3f9b0be 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -6,8 +6,13 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock import anyio +import pytest -from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport +from claude_code_sdk._errors import CLIJSONDecodeError +from claude_code_sdk._internal.transport.subprocess_cli import ( + _MAX_BUFFER_SIZE, + SubprocessCLITransport, +) from claude_code_sdk.types import ClaudeCodeOptions @@ -40,34 +45,27 @@ class TestSubprocessBuffering: """ async def _test() -> None: - # Two valid JSON objects separated by a newline character json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} json_obj2 = {"type": "result", "id": "res1", "status": "completed"} - # Simulate buffered output where both objects appear on one line buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) - # Create transport transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) - # Mock the process and streams mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process - # Create mock stream that returns the buffered line transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] - # Collect all messages messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) - # Verify both JSON objects were successfully parsed assert len(messages) == 2 assert messages[0]["type"] == "message" assert messages[0]["id"] == "msg1" @@ -82,7 +80,6 @@ class TestSubprocessBuffering: """Test parsing JSON objects that contain newline characters in string values.""" async def _test() -> None: - # JSON objects with newlines in string values json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"} json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"} @@ -116,7 +113,6 @@ class TestSubprocessBuffering: json_obj1 = {"type": "message", "id": "msg1"} json_obj2 = {"type": "result", "id": "res1"} - # Multiple newlines between objects buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( @@ -139,3 +135,174 @@ class TestSubprocessBuffering: assert messages[1]["id"] == "res1" anyio.run(_test) + + def test_split_json_across_multiple_reads(self) -> None: + """Test parsing when a single JSON object is split across multiple stream reads.""" + + async def _test() -> None: + json_obj = { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "x" * 1000}, + { + "type": "tool_use", + "id": "tool_123", + "name": "Read", + "input": {"file_path": "/test.txt"}, + }, + ] + }, + } + + complete_json = json.dumps(json_obj) + + part1 = complete_json[:100] + part2 = complete_json[100:250] + part3 = complete_json[250:] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([part1, part2, part3]) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 1 + assert messages[0]["type"] == "assistant" + assert len(messages[0]["message"]["content"]) == 2 + + anyio.run(_test) + + def test_large_minified_json(self) -> None: + """Test parsing a large minified JSON (simulating the reported issue).""" + + async def _test() -> None: + large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]} + json_obj = { + "type": "user", + "message": { + "role": "user", + "content": [ + { + "tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj", + "type": "tool_result", + "content": json.dumps(large_data), + } + ], + }, + } + + complete_json = json.dumps(json_obj) + + chunk_size = 64 * 1024 + chunks = [ + complete_json[i : i + chunk_size] + for i in range(0, len(complete_json), chunk_size) + ] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(chunks) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 1 + assert messages[0]["type"] == "user" + assert ( + messages[0]["message"]["content"][0]["tool_use_id"] + == "toolu_016fed1NhiaMLqnEvrj5NUaj" + ) + + anyio.run(_test) + + def test_buffer_size_exceeded(self) -> None: + """Test that exceeding buffer size raises an appropriate error.""" + + async def _test() -> None: + huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000) + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([huge_incomplete]) + transport._stderr_stream = MockTextReceiveStream([]) + + with pytest.raises(Exception) as exc_info: + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(exc_info.value.exceptions) == 1 + assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) + assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0]) + + anyio.run(_test) + + def test_mixed_complete_and_split_json(self) -> None: + """Test handling a mix of complete and split JSON messages.""" + + async def _test() -> None: + msg1 = json.dumps({"type": "system", "subtype": "start"}) + + large_msg = { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "y" * 5000}]}, + } + large_json = json.dumps(large_msg) + + msg3 = json.dumps({"type": "system", "subtype": "end"}) + + lines = [ + msg1 + "\n", + large_json[:1000], + large_json[1000:3000], + large_json[3000:] + "\n" + msg3, + ] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["subtype"] == "start" + assert messages[1]["type"] == "assistant" + assert len(messages[1]["message"]["content"][0]["text"]) == 5000 + assert messages[2]["type"] == "system" + assert messages[2]["subtype"] == "end" + + anyio.run(_test)