diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index d403ced..6d76f38 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -17,6 +17,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ...types import ClaudeCodeOptions from . import Transport +_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit + class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI.""" @@ -182,6 +184,9 @@ class SubprocessCLITransport(Transport): async with anyio.create_task_group() as tg: tg.start_soon(read_stderr) + # Buffer for incomplete JSON + json_buffer = "" + try: async for line in self._stdout_stream: line_str = line.strip() @@ -196,16 +201,27 @@ class SubprocessCLITransport(Transport): if not json_line: continue + # Add to buffer + json_buffer += json_line + + # Check buffer size + if len(json_buffer) > _MAX_BUFFER_SIZE: + json_buffer = "" # Clear buffer to prevent repeated errors + raise SDKJSONDecodeError( + f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", + None + ) + try: - data = json.loads(json_line) + data = json.loads(json_buffer) + json_buffer = "" # Clear buffer on successful parse try: yield data except GeneratorExit: # Handle generator cleanup gracefully return - except json.JSONDecodeError as e: - if json_line.startswith("{") or json_line.startswith("["): - raise SDKJSONDecodeError(json_line, e) from e + except json.JSONDecodeError: + # Continue accumulating in buffer continue except anyio.ClosedResourceError: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 06886df..4e6dab3 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -6,8 +6,13 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock import anyio +import pytest -from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport +from claude_code_sdk._errors import CLIJSONDecodeError +from claude_code_sdk._internal.transport.subprocess_cli import ( + _MAX_BUFFER_SIZE, + SubprocessCLITransport, +) from claude_code_sdk.types import ClaudeCodeOptions @@ -139,3 +144,174 @@ class TestSubprocessBuffering: assert messages[1]["id"] == "res1" anyio.run(_test) + + def test_split_json_across_multiple_reads(self) -> None: + """Test parsing when a single JSON object is split across multiple stream reads.""" + + async def _test() -> None: + # Large JSON object that simulates being split + json_obj = { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "x" * 1000}, + {"type": "tool_use", "id": "tool_123", "name": "Read", "input": {"file_path": "/test.txt"}} + ] + } + } + + complete_json = json.dumps(json_obj) + + # Split at arbitrary points to simulate stream chunking + part1 = complete_json[:100] + part2 = complete_json[100:250] + part3 = complete_json[250:] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([part1, part2, part3]) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # Should reconstruct the complete JSON + assert len(messages) == 1 + assert messages[0]["type"] == "assistant" + assert len(messages[0]["message"]["content"]) == 2 + + anyio.run(_test) + + def test_large_minified_json(self) -> None: + """Test parsing a large minified JSON (simulating the reported issue).""" + + async def _test() -> None: + # Create a large minified JSON similar to what caused the issue + large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]} + json_obj = { + "type": "user", + "message": { + "role": "user", + "content": [ + { + "tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj", + "type": "tool_result", + "content": json.dumps(large_data) + } + ] + } + } + + complete_json = json.dumps(json_obj) + + # Split into chunks simulating 64KB buffer limit + chunk_size = 64 * 1024 # 64KB + chunks = [complete_json[i:i+chunk_size] for i in range(0, len(complete_json), chunk_size)] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(chunks) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 1 + assert messages[0]["type"] == "user" + assert messages[0]["message"]["content"][0]["tool_use_id"] == "toolu_016fed1NhiaMLqnEvrj5NUaj" + + anyio.run(_test) + + def test_buffer_size_exceeded(self) -> None: + """Test that exceeding buffer size raises an appropriate error.""" + + async def _test() -> None: + # Create incomplete JSON larger than buffer limit + huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000) + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([huge_incomplete]) + transport._stderr_stream = MockTextReceiveStream([]) + + with pytest.raises(Exception) as exc_info: + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # The exception is wrapped in ExceptionGroup by anyio + assert len(exc_info.value.exceptions) == 1 + assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) + assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0]) + + anyio.run(_test) + + def test_mixed_complete_and_split_json(self) -> None: + """Test handling a mix of complete and split JSON messages.""" + + async def _test() -> None: + # First: complete JSON + msg1 = json.dumps({"type": "system", "subtype": "start"}) + + # Second: large JSON split across reads + large_msg = { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "y" * 5000}]} + } + large_json = json.dumps(large_msg) + + # Third: another complete JSON + msg3 = json.dumps({"type": "system", "subtype": "end"}) + + # Simulate streaming with mixed complete and partial messages + lines = [ + msg1 + "\n", + large_json[:1000], # First part of large message + large_json[1000:3000], # Middle part + large_json[3000:] + "\n" + msg3 # End of large message + complete message + ] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["subtype"] == "start" + assert messages[1]["type"] == "assistant" + assert len(messages[1]["message"]["content"][0]["text"]) == 5000 + assert messages[2]["type"] == "system" + assert messages[2]["subtype"] == "end" + + anyio.run(_test)