Bug fix: Handle multiline json

This commit is contained in:
Arun Parthiban 2025-06-19 07:29:10 -04:00
parent 7efa8b3987
commit 9d7a2a781e
No known key found for this signature in database
2 changed files with 163 additions and 9 deletions

View file

@ -182,23 +182,54 @@ class SubprocessCLITransport(Transport):
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)
buffer = ""
is_multiline_json = False
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
try:
data = json.loads(line_str)
if is_multiline_json:
# We're collecting lines for multiline JSON
buffer += line_str
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
data = json.loads(buffer)
# Success! Yield and reset
try:
yield data
except GeneratorExit:
return
buffer = ""
is_multiline_json = False
except json.JSONDecodeError:
# Still not valid, keep collecting
continue
else:
# Try to parse line as complete JSON first
try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
return
except json.JSONDecodeError as e:
# Failed to parse - check if it looks like start of JSON
if line_str.startswith("{") or line_str.startswith("["):
# Start buffering for potential multiline JSON
buffer = line_str
is_multiline_json = True
# If it doesn't look like JSON, just skip the line
continue
# Handle any remaining buffer after stream ends
if buffer and is_multiline_json:
try:
data = json.loads(buffer)
yield data
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
raise SDKJSONDecodeError(buffer, e) from e
except anyio.ClosedResourceError:
pass

View file

@ -1,11 +1,13 @@
"""Tests for Claude SDK transport layer."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import anyio
import pytest
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk._errors import CLIJSONDecodeError as SDKJSONDecodeError
from claude_code_sdk.types import ClaudeCodeOptions
@ -132,3 +134,124 @@ class TestSubprocessCLITransport:
# So we just verify the transport can be created and basic structure is correct
assert transport._prompt == "test"
assert transport._cli_path == "/usr/bin/claude"
def test_multiline_json_parsing(self):
"""Test parsing JSON that works both single-line and with buffering logic."""
async def _test():
# Mock process and streams
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.wait = AsyncMock(return_value=0)
# Test data: valid single-line JSON only
test_lines = [
'{"type": "single", "data": "complete"}', # Valid single line JSON
'{"type": "valid_long", "data": {"nested": "value"}, "complete": true}', # Another valid single line
]
# Create async iterator from test lines
class MockTextReceiveStream:
def __init__(self, lines):
self.lines = iter(lines)
def __aiter__(self):
return self
async def __anext__(self):
try:
return next(self.lines)
except StopIteration:
raise StopAsyncIteration
mock_stdout_stream = MockTextReceiveStream(test_lines)
mock_stderr_stream = MockTextReceiveStream([])
with patch("anyio.open_process") as mock_open_process:
mock_open_process.return_value = mock_process
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude",
)
# Manually set up the streams for testing
transport._process = mock_process
transport._stdout_stream = mock_stdout_stream # type: ignore
transport._stderr_stream = mock_stderr_stream # type: ignore
# Collect all yielded messages
messages = []
async for message in transport.receive_messages():
messages.append(message)
# Verify we got the expected valid JSON messages
assert len(messages) == 2
# Check first single line JSON
assert messages[0] == {"type": "single", "data": "complete"}
# Check second single line JSON
assert messages[1] == {
"type": "valid_long",
"data": {"nested": "value"},
"complete": True
}
anyio.run(_test)
def test_multiline_json_no_error_on_valid_completion(self):
"""Test that valid multiline JSON doesn't raise error."""
async def _test():
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.wait = AsyncMock(return_value=0)
# Test multiline JSON that completes properly
test_lines = [
'{"type": "multiline",',
'"data": "test",',
'"complete": true}',
]
class MockTextReceiveStream:
def __init__(self, lines):
self.lines = iter(lines)
def __aiter__(self):
return self
async def __anext__(self):
try:
return next(self.lines)
except StopIteration:
raise StopAsyncIteration
mock_stdout_stream = MockTextReceiveStream(test_lines)
mock_stderr_stream = MockTextReceiveStream([])
with patch("anyio.open_process") as mock_open_process:
mock_open_process.return_value = mock_process
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude",
)
transport._process = mock_process
transport._stdout_stream = mock_stdout_stream # type: ignore
transport._stderr_stream = mock_stderr_stream # type: ignore
messages = []
async for message in transport.receive_messages():
messages.append(message)
# Should get exactly one properly parsed message
assert len(messages) == 1
expected = {"type": "multiline", "data": "test", "complete": True}
assert messages[0] == expected
anyio.run(_test)