Merge pull request #53 from anthropics/fix/json-streaming-buffer
Some checks are pending
Lint / lint (push) Waiting to run
Test / test (3.10) (push) Waiting to run
Test / test (3.11) (push) Waiting to run
Test / test (3.12) (push) Waiting to run
Test / test (3.13) (push) Waiting to run

fix: handle JSON messages split across multiple stream reads
This commit is contained in:
Lina Tawfik 2025-06-30 23:03:27 -07:00 committed by GitHub
commit 4af210ee8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 196 additions and 16 deletions

View file

@ -17,6 +17,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions
from . import Transport
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
@ -182,13 +184,14 @@ class SubprocessCLITransport(Transport):
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)
json_buffer = ""
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
# Split on newlines in case multiple JSON objects are buffered together
json_lines = line_str.split("\n")
for json_line in json_lines:
@ -196,16 +199,26 @@ class SubprocessCLITransport(Transport):
if not json_line:
continue
# Keep accumulating partial JSON until we can parse it
json_buffer += json_line
if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = ""
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
),
)
try:
data = json.loads(json_line)
data = json.loads(json_buffer)
json_buffer = ""
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if json_line.startswith("{") or json_line.startswith("["):
raise SDKJSONDecodeError(json_line, e) from e
except json.JSONDecodeError:
continue
except anyio.ClosedResourceError:

View file

@ -6,8 +6,13 @@ from typing import Any
from unittest.mock import AsyncMock, MagicMock
import anyio
import pytest
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import (
_MAX_BUFFER_SIZE,
SubprocessCLITransport,
)
from claude_code_sdk.types import ClaudeCodeOptions
@ -40,34 +45,27 @@ class TestSubprocessBuffering:
"""
async def _test() -> None:
# Two valid JSON objects separated by a newline character
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
# Simulate buffered output where both objects appear on one line
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
# Create transport
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
# Mock the process and streams
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
# Create mock stream that returns the buffered line
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
# Collect all messages
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Verify both JSON objects were successfully parsed
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
@ -82,7 +80,6 @@ class TestSubprocessBuffering:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
# JSON objects with newlines in string values
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
@ -116,7 +113,6 @@ class TestSubprocessBuffering:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
# Multiple newlines between objects
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
@ -139,3 +135,174 @@ class TestSubprocessBuffering:
assert messages[1]["id"] == "res1"
anyio.run(_test)
def test_split_json_across_multiple_reads(self) -> None:
"""Test parsing when a single JSON object is split across multiple stream reads."""
async def _test() -> None:
json_obj = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "x" * 1000},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
]
},
}
complete_json = json.dumps(json_obj)
part1 = complete_json[:100]
part2 = complete_json[100:250]
part3 = complete_json[250:]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2
anyio.run(_test)
def test_large_minified_json(self) -> None:
"""Test parsing a large minified JSON (simulating the reported issue)."""
async def _test() -> None:
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
json_obj = {
"type": "user",
"message": {
"role": "user",
"content": [
{
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
"type": "tool_result",
"content": json.dumps(large_data),
}
],
},
}
complete_json = json.dumps(json_obj)
chunk_size = 64 * 1024
chunks = [
complete_json[i : i + chunk_size]
for i in range(0, len(complete_json), chunk_size)
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(chunks)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "user"
assert (
messages[0]["message"]["content"][0]["tool_use_id"]
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
)
anyio.run(_test)
def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""
async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(exc_info.value.exceptions) == 1
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)
assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0])
anyio.run(_test)
def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""
async def _test() -> None:
msg1 = json.dumps({"type": "system", "subtype": "start"})
large_msg = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
}
large_json = json.dumps(large_msg)
msg3 = json.dumps({"type": "system", "subtype": "end"})
lines = [
msg1 + "\n",
large_json[:1000],
large_json[1000:3000],
large_json[3000:] + "\n" + msg3,
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"
anyio.run(_test)