"""Tests for subprocess transport buffering edge cases.""" import json from collections.abc import AsyncIterator from typing import Any from unittest.mock import AsyncMock, MagicMock import anyio import pytest from claude_code_sdk._errors import CLIJSONDecodeError from claude_code_sdk._internal.transport.subprocess_cli import ( _MAX_BUFFER_SIZE, SubprocessCLITransport, ) from claude_code_sdk.types import ClaudeCodeOptions class MockTextReceiveStream: """Mock TextReceiveStream for testing.""" def __init__(self, lines: list[str]) -> None: self.lines = lines self.index = 0 def __aiter__(self) -> AsyncIterator[str]: return self async def __anext__(self) -> str: if self.index >= len(self.lines): raise StopAsyncIteration line = self.lines[self.index] self.index += 1 return line class TestSubprocessBuffering: """Test subprocess transport handling of buffered output.""" def test_multiple_json_objects_on_single_line(self) -> None: """Test parsing when multiple JSON objects are concatenated on a single line. In some environments, stdout buffering can cause multiple distinct JSON objects to be delivered as a single line with embedded newlines. """ async def _test() -> None: json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} json_obj2 = {"type": "result", "id": "res1", "status": "completed"} buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert len(messages) == 2 assert messages[0]["type"] == "message" assert messages[0]["id"] == "msg1" assert messages[0]["content"] == "First message" assert messages[1]["type"] == "result" assert messages[1]["id"] == "res1" assert messages[1]["status"] == "completed" anyio.run(_test) def test_json_with_embedded_newlines(self) -> None: """Test parsing JSON objects that contain newline characters in string values.""" async def _test() -> None: json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"} json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"} buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([buffered_line]) transport._stderr_stream = MockTextReceiveStream([]) messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert len(messages) == 2 assert messages[0]["content"] == "Line 1\nLine 2\nLine 3" assert messages[1]["data"] == "Some\nMultiline\nContent" anyio.run(_test) def test_multiple_newlines_between_objects(self) -> None: """Test parsing with multiple newlines between JSON objects.""" async def _test() -> None: json_obj1 = {"type": "message", "id": "msg1"} json_obj2 = {"type": "result", "id": "res1"} buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([buffered_line]) transport._stderr_stream = MockTextReceiveStream([]) messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert len(messages) == 2 assert messages[0]["id"] == "msg1" assert messages[1]["id"] == "res1" anyio.run(_test) def test_split_json_across_multiple_reads(self) -> None: """Test parsing when a single JSON object is split across multiple stream reads.""" async def _test() -> None: json_obj = { "type": "assistant", "message": { "content": [ {"type": "text", "text": "x" * 1000}, { "type": "tool_use", "id": "tool_123", "name": "Read", "input": {"file_path": "/test.txt"}, }, ] }, } complete_json = json.dumps(json_obj) part1 = complete_json[:100] part2 = complete_json[100:250] part3 = complete_json[250:] transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([part1, part2, part3]) transport._stderr_stream = MockTextReceiveStream([]) messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert len(messages) == 1 assert messages[0]["type"] == "assistant" assert len(messages[0]["message"]["content"]) == 2 anyio.run(_test) def test_large_minified_json(self) -> None: """Test parsing a large minified JSON (simulating the reported issue).""" async def _test() -> None: large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]} json_obj = { "type": "user", "message": { "role": "user", "content": [ { "tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj", "type": "tool_result", "content": json.dumps(large_data), } ], }, } complete_json = json.dumps(json_obj) chunk_size = 64 * 1024 chunks = [ complete_json[i : i + chunk_size] for i in range(0, len(complete_json), chunk_size) ] transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream(chunks) transport._stderr_stream = MockTextReceiveStream([]) messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert len(messages) == 1 assert messages[0]["type"] == "user" assert ( messages[0]["message"]["content"][0]["tool_use_id"] == "toolu_016fed1NhiaMLqnEvrj5NUaj" ) anyio.run(_test) def test_buffer_size_exceeded(self) -> None: """Test that exceeding buffer size raises an appropriate error.""" async def _test() -> None: huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000) transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([huge_incomplete]) transport._stderr_stream = MockTextReceiveStream([]) with pytest.raises(Exception) as exc_info: messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert isinstance(exc_info.value, CLIJSONDecodeError) assert "exceeded maximum buffer size" in str(exc_info.value) anyio.run(_test) def test_mixed_complete_and_split_json(self) -> None: """Test handling a mix of complete and split JSON messages.""" async def _test() -> None: msg1 = json.dumps({"type": "system", "subtype": "start"}) large_msg = { "type": "assistant", "message": {"content": [{"type": "text", "text": "y" * 5000}]}, } large_json = json.dumps(large_msg) msg3 = json.dumps({"type": "system", "subtype": "end"}) lines = [ msg1 + "\n", large_json[:1000], large_json[1000:3000], large_json[3000:] + "\n" + msg3, ] transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream(lines) transport._stderr_stream = MockTextReceiveStream([]) messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) assert len(messages) == 3 assert messages[0]["type"] == "system" assert messages[0]["subtype"] == "start" assert messages[1]["type"] == "assistant" assert len(messages[1]["message"]["content"][0]["text"]) == 5000 assert messages[2]["type"] == "system" assert messages[2]["subtype"] == "end" anyio.run(_test)