Fix parsing failure for large JSON outputs split across buffer boundaries

This commit is contained in:
Reshab Das 2025-06-28 15:17:27 +05:30
parent b3c20bd7e3
commit a05469f031
2 changed files with 179 additions and 6 deletions

View file

@ -183,11 +183,22 @@ class SubprocessCLITransport(Transport):
tg.start_soon(read_stderr)
try:
# store incomplete buffered json output
incomplete_json_line_str = None
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
if incomplete_json_line_str:
assert not line_str.startswith("{"), (
"New line must complete incomplete JSON"
)
line_str = incomplete_json_line_str + line_str
# Store parsed JSON output for the line
parsed_json_outputs = []
# Split on newlines in case multiple JSON objects are buffered together
json_lines = line_str.split("\n")
@ -198,16 +209,31 @@ class SubprocessCLITransport(Transport):
try:
data = json.loads(json_line)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
# line has been parsed, reseting incomplete_json_line_str
incomplete_json_line_str = None
# Yield later to avoid duplicates on incomplete line_str
parsed_json_outputs.append(data)
except json.JSONDecodeError as e:
if json_line.startswith("{") or json_line.startswith("["):
raise SDKJSONDecodeError(json_line, e) from e
incomplete_json_line_str = line_str
# raise error only if output is complete JSON but unable to parse
if json_line.endswith("}") or json_line.startswith("]"):
raise SDKJSONDecodeError(json_line, e) from e
continue
if incomplete_json_line_str:
continue
for json_output_data in parsed_json_outputs:
try:
yield json_output_data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except anyio.ClosedResourceError:
pass

View file

@ -6,7 +6,9 @@ from typing import Any
from unittest.mock import AsyncMock, MagicMock
import anyio
import pytest
from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk.types import ClaudeCodeOptions
@ -139,3 +141,148 @@ class TestSubprocessBuffering:
assert messages[1]["id"] == "res1"
anyio.run(_test)
def test_incomplete_json_across_multiple_lines(self) -> None:
"""Test parsing when JSON is split across multiple lines due to buffering."""
async def _test() -> None:
# Large JSON that gets split across multiple lines
json_obj = {
"type": "assistant",
"message": {
"content": [
{
"type": "text",
"text": "This is a very long response that might get split",
},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/very/long/path/to/file.py"},
},
]
},
}
complete_json = json.dumps(json_obj)
# Split the JSON at an arbitrary point to simulate buffering
split_point = len(complete_json) // 2
first_part = complete_json[:split_point]
second_part = complete_json[split_point:]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
# Simulate receiving the JSON in two parts
transport._stdout_stream = MockTextReceiveStream([first_part, second_part])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Should parse as one complete message
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2
anyio.run(_test)
def test_malformed_complete_json_raises_error(self) -> None:
"""Test that malformed but seemingly complete JSON raises an error."""
async def _test() -> None:
# JSON that looks complete but is malformed
malformed_json = '{"type": "message", "invalid": unquoted_value}'
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([malformed_json])
transport._stderr_stream = MockTextReceiveStream([])
# Should raise CLIJSONDecodeError for malformed complete JSON
# The exception will be wrapped in an ExceptionGroup due to anyio task group
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Verify the actual exception is CLIJSONDecodeError
assert len(exc_info.value.exceptions) == 1
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)
anyio.run(_test)
def test_no_duplicate_output_when_incomplete_json_followed_by_valid(self) -> None:
"""Test that we don't duplicate output when incomplete JSON is followed by valid JSON."""
async def _test() -> None:
# First valid JSON
valid_json1 = {"type": "user", "message": "first message"}
# Second valid JSON
valid_json2 = {"type": "assistant", "message": "second message"}
# Large JSON that will be incomplete
large_json = {
"type": "result",
"data": "Very large data " * 200, # Make it large enough to split
"status": "completed",
}
valid_str1 = json.dumps(valid_json1)
valid_str2 = json.dumps(valid_json2)
large_str = json.dumps(large_json)
# Split the large JSON
split_point = len(large_str) // 2
large_part1 = large_str[:split_point]
large_part2 = large_str[split_point:]
# Create line that has: valid JSON + newline + valid JSON + newline + incomplete large JSON
combined_line = valid_str1 + "\n" + valid_str2 + "\n" + large_part1
lines = [
combined_line, # First line: 2 valid JSONs + start of large JSON
large_part2, # Second line: completion of large JSON
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Should have exactly 3 messages: 2 from first line + 1 completed large JSON
assert len(messages) == 3
assert messages[0]["type"] == "user"
assert messages[0]["message"] == "first message"
assert messages[1]["type"] == "assistant"
assert messages[1]["message"] == "second message"
assert messages[2]["type"] == "result"
assert messages[2]["status"] == "completed"
anyio.run(_test)