fix: handle JSON messages split across multiple stream reads

Adds buffering to accumulate incomplete JSON messages that are split
across multiple stream reads due to asyncio's 64KB default buffer limit.

- Implement 1MB buffer to accumulate partial JSON
- Clear buffer on successful parse or size limit exceeded
- Add comprehensive tests for split JSON scenarios

Fixes CLIJSONDecodeError when reading large minified JSON files.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Lina Tawfik 2025-06-30 21:55:06 -07:00
parent 012e1d1f09
commit 24295417ac
No known key found for this signature in database
2 changed files with 197 additions and 5 deletions

View file

@ -17,6 +17,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions from ...types import ClaudeCodeOptions
from . import Transport from . import Transport
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
class SubprocessCLITransport(Transport): class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI.""" """Subprocess transport using Claude Code CLI."""
@ -182,6 +184,9 @@ class SubprocessCLITransport(Transport):
async with anyio.create_task_group() as tg: async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr) tg.start_soon(read_stderr)
# Buffer for incomplete JSON
json_buffer = ""
try: try:
async for line in self._stdout_stream: async for line in self._stdout_stream:
line_str = line.strip() line_str = line.strip()
@ -196,16 +201,27 @@ class SubprocessCLITransport(Transport):
if not json_line: if not json_line:
continue continue
# Add to buffer
json_buffer += json_line
# Check buffer size
if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = "" # Clear buffer to prevent repeated errors
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
None
)
try: try:
data = json.loads(json_line) data = json.loads(json_buffer)
json_buffer = "" # Clear buffer on successful parse
try: try:
yield data yield data
except GeneratorExit: except GeneratorExit:
# Handle generator cleanup gracefully # Handle generator cleanup gracefully
return return
except json.JSONDecodeError as e: except json.JSONDecodeError:
if json_line.startswith("{") or json_line.startswith("["): # Continue accumulating in buffer
raise SDKJSONDecodeError(json_line, e) from e
continue continue
except anyio.ClosedResourceError: except anyio.ClosedResourceError:

View file

@ -6,8 +6,13 @@ from typing import Any
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import anyio import anyio
import pytest
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import (
_MAX_BUFFER_SIZE,
SubprocessCLITransport,
)
from claude_code_sdk.types import ClaudeCodeOptions from claude_code_sdk.types import ClaudeCodeOptions
@ -139,3 +144,174 @@ class TestSubprocessBuffering:
assert messages[1]["id"] == "res1" assert messages[1]["id"] == "res1"
anyio.run(_test) anyio.run(_test)
def test_split_json_across_multiple_reads(self) -> None:
"""Test parsing when a single JSON object is split across multiple stream reads."""
async def _test() -> None:
# Large JSON object that simulates being split
json_obj = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "x" * 1000},
{"type": "tool_use", "id": "tool_123", "name": "Read", "input": {"file_path": "/test.txt"}}
]
}
}
complete_json = json.dumps(json_obj)
# Split at arbitrary points to simulate stream chunking
part1 = complete_json[:100]
part2 = complete_json[100:250]
part3 = complete_json[250:]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Should reconstruct the complete JSON
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2
anyio.run(_test)
def test_large_minified_json(self) -> None:
"""Test parsing a large minified JSON (simulating the reported issue)."""
async def _test() -> None:
# Create a large minified JSON similar to what caused the issue
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
json_obj = {
"type": "user",
"message": {
"role": "user",
"content": [
{
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
"type": "tool_result",
"content": json.dumps(large_data)
}
]
}
}
complete_json = json.dumps(json_obj)
# Split into chunks simulating 64KB buffer limit
chunk_size = 64 * 1024 # 64KB
chunks = [complete_json[i:i+chunk_size] for i in range(0, len(complete_json), chunk_size)]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(chunks)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "user"
assert messages[0]["message"]["content"][0]["tool_use_id"] == "toolu_016fed1NhiaMLqnEvrj5NUaj"
anyio.run(_test)
def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""
async def _test() -> None:
# Create incomplete JSON larger than buffer limit
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# The exception is wrapped in ExceptionGroup by anyio
assert len(exc_info.value.exceptions) == 1
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)
assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0])
anyio.run(_test)
def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""
async def _test() -> None:
# First: complete JSON
msg1 = json.dumps({"type": "system", "subtype": "start"})
# Second: large JSON split across reads
large_msg = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "y" * 5000}]}
}
large_json = json.dumps(large_msg)
# Third: another complete JSON
msg3 = json.dumps({"type": "system", "subtype": "end"})
# Simulate streaming with mixed complete and partial messages
lines = [
msg1 + "\n",
large_json[:1000], # First part of large message
large_json[1000:3000], # Middle part
large_json[3000:] + "\n" + msg3 # End of large message + complete message
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"
anyio.run(_test)