From a05469f0317b7df2586229d33ffd17a0061e196b Mon Sep 17 00:00:00 2001 From: Reshab Das Date: Sat, 28 Jun 2025 15:17:27 +0530 Subject: [PATCH] Fix parsing failure for large JSON outputs split across buffer boundaries --- .../_internal/transport/subprocess_cli.py | 38 ++++- tests/test_subprocess_buffering.py | 147 ++++++++++++++++++ 2 files changed, 179 insertions(+), 6 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index d403ced..e6085ba 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -183,11 +183,22 @@ class SubprocessCLITransport(Transport): tg.start_soon(read_stderr) try: + # store incomplete buffered json output + incomplete_json_line_str = None async for line in self._stdout_stream: line_str = line.strip() if not line_str: continue + if incomplete_json_line_str: + assert not line_str.startswith("{"), ( + "New line must complete incomplete JSON" + ) + line_str = incomplete_json_line_str + line_str + + # Store parsed JSON output for the line + parsed_json_outputs = [] + # Split on newlines in case multiple JSON objects are buffered together json_lines = line_str.split("\n") @@ -198,16 +209,31 @@ class SubprocessCLITransport(Transport): try: data = json.loads(json_line) - try: - yield data - except GeneratorExit: - # Handle generator cleanup gracefully - return + + # line has been parsed, reseting incomplete_json_line_str + incomplete_json_line_str = None + + # Yield later to avoid duplicates on incomplete line_str + parsed_json_outputs.append(data) except json.JSONDecodeError as e: if json_line.startswith("{") or json_line.startswith("["): - raise SDKJSONDecodeError(json_line, e) from e + incomplete_json_line_str = line_str + + # raise error only if output is complete JSON but unable to parse + if json_line.endswith("}") or json_line.startswith("]"): + raise SDKJSONDecodeError(json_line, e) from e continue + if incomplete_json_line_str: + continue + + for json_output_data in parsed_json_outputs: + try: + yield json_output_data + except GeneratorExit: + # Handle generator cleanup gracefully + return + except anyio.ClosedResourceError: pass diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 06886df..bfe2e7b 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -6,7 +6,9 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock import anyio +import pytest +from claude_code_sdk._errors import CLIJSONDecodeError from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport from claude_code_sdk.types import ClaudeCodeOptions @@ -139,3 +141,148 @@ class TestSubprocessBuffering: assert messages[1]["id"] == "res1" anyio.run(_test) + + def test_incomplete_json_across_multiple_lines(self) -> None: + """Test parsing when JSON is split across multiple lines due to buffering.""" + + async def _test() -> None: + # Large JSON that gets split across multiple lines + json_obj = { + "type": "assistant", + "message": { + "content": [ + { + "type": "text", + "text": "This is a very long response that might get split", + }, + { + "type": "tool_use", + "id": "tool_123", + "name": "Read", + "input": {"file_path": "/very/long/path/to/file.py"}, + }, + ] + }, + } + + complete_json = json.dumps(json_obj) + + # Split the JSON at an arbitrary point to simulate buffering + split_point = len(complete_json) // 2 + first_part = complete_json[:split_point] + second_part = complete_json[split_point:] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + + # Simulate receiving the JSON in two parts + transport._stdout_stream = MockTextReceiveStream([first_part, second_part]) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # Should parse as one complete message + assert len(messages) == 1 + assert messages[0]["type"] == "assistant" + assert len(messages[0]["message"]["content"]) == 2 + + anyio.run(_test) + + def test_malformed_complete_json_raises_error(self) -> None: + """Test that malformed but seemingly complete JSON raises an error.""" + + async def _test() -> None: + # JSON that looks complete but is malformed + malformed_json = '{"type": "message", "invalid": unquoted_value}' + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([malformed_json]) + transport._stderr_stream = MockTextReceiveStream([]) + + # Should raise CLIJSONDecodeError for malformed complete JSON + # The exception will be wrapped in an ExceptionGroup due to anyio task group + with pytest.raises(Exception) as exc_info: + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # Verify the actual exception is CLIJSONDecodeError + assert len(exc_info.value.exceptions) == 1 + assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) + + anyio.run(_test) + + def test_no_duplicate_output_when_incomplete_json_followed_by_valid(self) -> None: + """Test that we don't duplicate output when incomplete JSON is followed by valid JSON.""" + + async def _test() -> None: + # First valid JSON + valid_json1 = {"type": "user", "message": "first message"} + + # Second valid JSON + valid_json2 = {"type": "assistant", "message": "second message"} + + # Large JSON that will be incomplete + large_json = { + "type": "result", + "data": "Very large data " * 200, # Make it large enough to split + "status": "completed", + } + + valid_str1 = json.dumps(valid_json1) + valid_str2 = json.dumps(valid_json2) + large_str = json.dumps(large_json) + + # Split the large JSON + split_point = len(large_str) // 2 + large_part1 = large_str[:split_point] + large_part2 = large_str[split_point:] + + # Create line that has: valid JSON + newline + valid JSON + newline + incomplete large JSON + combined_line = valid_str1 + "\n" + valid_str2 + "\n" + large_part1 + + lines = [ + combined_line, # First line: 2 valid JSONs + start of large JSON + large_part2, # Second line: completion of large JSON + ] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # Should have exactly 3 messages: 2 from first line + 1 completed large JSON + assert len(messages) == 3 + assert messages[0]["type"] == "user" + assert messages[0]["message"] == "first message" + assert messages[1]["type"] == "assistant" + assert messages[1]["message"] == "second message" + assert messages[2]["type"] == "result" + assert messages[2]["status"] == "completed" + + anyio.run(_test)