Fix test to use anyio.run() pattern and add edge case tests

- Convert async test to use anyio.run() pattern consistent with other tests
- Remove unused CLIJSONDecodeError import
- Add tests for JSON with embedded newlines
- Add tests for multiple newlines between objects
- All tests passing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Lina Tawfik 2025-06-27 14:29:23 -07:00
parent 87e2699f6a
commit 3f25c3bb83
No known key found for this signature in database

View file

@ -5,9 +5,8 @@ from collections.abc import AsyncIterator
from typing import Any from typing import Any
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import anyio
from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk.types import ClaudeCodeOptions from claude_code_sdk.types import ClaudeCodeOptions
@ -33,47 +32,113 @@ class MockTextReceiveStream:
class TestSubprocessBuffering: class TestSubprocessBuffering:
"""Test subprocess transport handling of buffered output.""" """Test subprocess transport handling of buffered output."""
@pytest.mark.asyncio def test_multiple_json_objects_on_single_line(self) -> None:
async def test_multiple_json_objects_on_single_line(self) -> None:
"""Test parsing when multiple JSON objects are concatenated on a single line. """Test parsing when multiple JSON objects are concatenated on a single line.
In some environments, stdout buffering can cause multiple distinct JSON In some environments, stdout buffering can cause multiple distinct JSON
objects to be delivered as a single line with embedded newlines. objects to be delivered as a single line with embedded newlines.
""" """
# Two valid JSON objects separated by a newline character async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} # Two valid JSON objects separated by a newline character
json_obj2 = {"type": "result", "id": "res1", "status": "completed"} json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
# Simulate buffered output where both objects appear on one line # Simulate buffered output where both objects appear on one line
buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2)
# Create transport # Create transport
transport = SubprocessCLITransport( transport = SubprocessCLITransport(
prompt="test", prompt="test",
options=ClaudeCodeOptions(), options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude" cli_path="/usr/bin/claude"
) )
# Mock the process and streams # Mock the process and streams
mock_process = MagicMock() mock_process = MagicMock()
mock_process.returncode = None mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None) mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process transport._process = mock_process
# Create mock stream that returns the buffered line # Create mock stream that returns the buffered line
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
# Collect all messages # Collect all messages
messages: list[Any] = [] messages: list[Any] = []
async for msg in transport.receive_messages(): async for msg in transport.receive_messages():
messages.append(msg) messages.append(msg)
# Verify both JSON objects were successfully parsed # Verify both JSON objects were successfully parsed
assert len(messages) == 2 assert len(messages) == 2
assert messages[0]["type"] == "message" assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1" assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message" assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result" assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1" assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed" assert messages[1]["status"] == "completed"
anyio.run(_test)
def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
# JSON objects with newlines in string values
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
assert messages[1]["data"] == "Some\nMultiline\nContent"
anyio.run(_test)
def test_multiple_newlines_between_objects(self) -> None:
"""Test parsing with multiple newlines between JSON objects."""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
# Multiple newlines between objects
buffered_line = json.dumps(json_obj1) + '\n\n\n' + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"
anyio.run(_test)