Fix test to use anyio.run() pattern and add edge case tests

- Convert async test to use anyio.run() pattern consistent with other tests
- Remove unused CLIJSONDecodeError import
- Add tests for JSON with embedded newlines
- Add tests for multiple newlines between objects
- All tests passing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Lina Tawfik 2025-06-27 14:29:23 -07:00
parent 87e2699f6a
commit b54f5df0c8
No known key found for this signature in database

View file

@ -5,9 +5,8 @@ from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
import anyio
from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk.types import ClaudeCodeOptions
@ -33,47 +32,113 @@ class MockTextReceiveStream:
class TestSubprocessBuffering:
"""Test subprocess transport handling of buffered output."""
@pytest.mark.asyncio
async def test_multiple_json_objects_on_single_line(self) -> None:
def test_multiple_json_objects_on_single_line(self) -> None:
"""Test parsing when multiple JSON objects are concatenated on a single line.
In some environments, stdout buffering can cause multiple distinct JSON
objects to be delivered as a single line with embedded newlines.
"""
# Two valid JSON objects separated by a newline character
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
async def _test() -> None:
# Two valid JSON objects separated by a newline character
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
# Simulate buffered output where both objects appear on one line
buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2)
# Simulate buffered output where both objects appear on one line
buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2)
# Create transport
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude"
)
# Create transport
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude"
)
# Mock the process and streams
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
# Mock the process and streams
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
# Create mock stream that returns the buffered line
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
# Create mock stream that returns the buffered line
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
# Collect all messages
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Collect all messages
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
# Verify both JSON objects were successfully parsed
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed"
# Verify both JSON objects were successfully parsed
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed"
anyio.run(_test)
def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
# JSON objects with newlines in string values
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
assert messages[1]["data"] == "Some\nMultiline\nContent"
anyio.run(_test)
def test_multiple_newlines_between_objects(self) -> None:
"""Test parsing with multiple newlines between JSON objects."""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
# Multiple newlines between objects
buffered_line = json.dumps(json_obj1) + '\n\n\n' + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"
anyio.run(_test)