Merge pull request #5 from Bradley-Butcher/fix/subprocess-buffering-issue

fix multi-line buffering issue
This commit is contained in:
Lina Tawfik 2025-06-27 15:33:29 -07:00 committed by GitHub
commit 9bda4e8982
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 159 additions and 10 deletions

View file

@ -188,17 +188,25 @@ class SubprocessCLITransport(Transport):
if not line_str: if not line_str:
continue continue
try: # Split on newlines in case multiple JSON objects are buffered together
data = json.loads(line_str) json_lines = line_str.split("\n")
for json_line in json_lines:
json_line = json_line.strip()
if not json_line:
continue
try: try:
yield data data = json.loads(json_line)
except GeneratorExit: try:
# Handle generator cleanup gracefully yield data
return except GeneratorExit:
except json.JSONDecodeError as e: # Handle generator cleanup gracefully
if line_str.startswith("{") or line_str.startswith("["): return
raise SDKJSONDecodeError(line_str, e) from e except json.JSONDecodeError as e:
continue if json_line.startswith("{") or json_line.startswith("["):
raise SDKJSONDecodeError(json_line, e) from e
continue
except anyio.ClosedResourceError: except anyio.ClosedResourceError:
pass pass

View file

@ -0,0 +1,141 @@
"""Tests for subprocess transport buffering edge cases."""
import json
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import anyio
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk.types import ClaudeCodeOptions
class MockTextReceiveStream:
"""Mock TextReceiveStream for testing."""
def __init__(self, lines: list[str]) -> None:
self.lines = lines
self.index = 0
def __aiter__(self) -> AsyncIterator[str]:
return self
async def __anext__(self) -> str:
if self.index >= len(self.lines):
raise StopAsyncIteration
line = self.lines[self.index]
self.index += 1
return line
class TestSubprocessBuffering:
"""Test subprocess transport handling of buffered output."""
def test_multiple_json_objects_on_single_line(self) -> None:
"""Test parsing when multiple JSON objects are concatenated on a single line.
In some environments, stdout buffering can cause multiple distinct JSON
objects to be delivered as a single line with embedded newlines.
"""
async def _test() -> None:
# Two valid JSON objects separated by a newline character
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
# Simulate buffered output where both objects appear on one line
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
# Create transport
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
# Mock the process and streams
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
# Create mock stream that returns the buffered line
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
# Collect all messages
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Verify both JSON objects were successfully parsed
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed"
anyio.run(_test)
def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
# JSON objects with newlines in string values
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
assert messages[1]["data"] == "Some\nMultiline\nContent"
anyio.run(_test)
def test_multiple_newlines_between_objects(self) -> None:
"""Test parsing with multiple newlines between JSON objects."""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
# Multiple newlines between objects
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"
anyio.run(_test)