From 87e2699f6a59b57986bab2a6586925372b8f0f6c Mon Sep 17 00:00:00 2001 From: Bradley-Butcher Date: Fri, 13 Jun 2025 19:36:14 +0100 Subject: [PATCH 1/4] fix multi-line buffering issue Signed-off-by: Bradley-Butcher --- .../_internal/transport/subprocess_cli.py | 28 ++++--- tests/test_subprocess_buffering.py | 79 +++++++++++++++++++ 2 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 tests/test_subprocess_buffering.py diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index f63732a..d403ced 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -188,17 +188,25 @@ class SubprocessCLITransport(Transport): if not line_str: continue - try: - data = json.loads(line_str) + # Split on newlines in case multiple JSON objects are buffered together + json_lines = line_str.split("\n") + + for json_line in json_lines: + json_line = json_line.strip() + if not json_line: + continue + try: - yield data - except GeneratorExit: - # Handle generator cleanup gracefully - return - except json.JSONDecodeError as e: - if line_str.startswith("{") or line_str.startswith("["): - raise SDKJSONDecodeError(line_str, e) from e - continue + data = json.loads(json_line) + try: + yield data + except GeneratorExit: + # Handle generator cleanup gracefully + return + except json.JSONDecodeError as e: + if json_line.startswith("{") or json_line.startswith("["): + raise SDKJSONDecodeError(json_line, e) from e + continue except anyio.ClosedResourceError: pass diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py new file mode 100644 index 0000000..0b7d476 --- /dev/null +++ b/tests/test_subprocess_buffering.py @@ -0,0 +1,79 @@ +"""Tests for subprocess transport buffering edge cases.""" + +import json +from collections.abc import AsyncIterator +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from claude_code_sdk._errors import CLIJSONDecodeError +from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport +from claude_code_sdk.types import ClaudeCodeOptions + + +class MockTextReceiveStream: + """Mock TextReceiveStream for testing.""" + + def __init__(self, lines: list[str]) -> None: + self.lines = lines + self.index = 0 + + def __aiter__(self) -> AsyncIterator[str]: + return self + + async def __anext__(self) -> str: + if self.index >= len(self.lines): + raise StopAsyncIteration + line = self.lines[self.index] + self.index += 1 + return line + + +class TestSubprocessBuffering: + """Test subprocess transport handling of buffered output.""" + + @pytest.mark.asyncio + async def test_multiple_json_objects_on_single_line(self) -> None: + """Test parsing when multiple JSON objects are concatenated on a single line. + + In some environments, stdout buffering can cause multiple distinct JSON + objects to be delivered as a single line with embedded newlines. + """ + # Two valid JSON objects separated by a newline character + json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} + json_obj2 = {"type": "result", "id": "res1", "status": "completed"} + + # Simulate buffered output where both objects appear on one line + buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) + + # Create transport + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeCodeOptions(), + cli_path="/usr/bin/claude" + ) + + # Mock the process and streams + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + + # Create mock stream that returns the buffered line + transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] + transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] + + # Collect all messages + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # Verify both JSON objects were successfully parsed + assert len(messages) == 2 + assert messages[0]["type"] == "message" + assert messages[0]["id"] == "msg1" + assert messages[0]["content"] == "First message" + assert messages[1]["type"] == "result" + assert messages[1]["id"] == "res1" + assert messages[1]["status"] == "completed" From 3f25c3bb83226dec2ac327c6d3f1419aa696dc56 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Fri, 27 Jun 2025 14:29:23 -0700 Subject: [PATCH 2/4] Fix test to use anyio.run() pattern and add edge case tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Convert async test to use anyio.run() pattern consistent with other tests - Remove unused CLIJSONDecodeError import - Add tests for JSON with embedded newlines - Add tests for multiple newlines between objects - All tests passing 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- tests/test_subprocess_buffering.py | 135 +++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 35 deletions(-) diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 0b7d476..4e26e90 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -5,9 +5,8 @@ from collections.abc import AsyncIterator from typing import Any from unittest.mock import AsyncMock, MagicMock -import pytest +import anyio -from claude_code_sdk._errors import CLIJSONDecodeError from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport from claude_code_sdk.types import ClaudeCodeOptions @@ -33,47 +32,113 @@ class MockTextReceiveStream: class TestSubprocessBuffering: """Test subprocess transport handling of buffered output.""" - @pytest.mark.asyncio - async def test_multiple_json_objects_on_single_line(self) -> None: + def test_multiple_json_objects_on_single_line(self) -> None: """Test parsing when multiple JSON objects are concatenated on a single line. In some environments, stdout buffering can cause multiple distinct JSON objects to be delivered as a single line with embedded newlines. """ - # Two valid JSON objects separated by a newline character - json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} - json_obj2 = {"type": "result", "id": "res1", "status": "completed"} + async def _test() -> None: + # Two valid JSON objects separated by a newline character + json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} + json_obj2 = {"type": "result", "id": "res1", "status": "completed"} - # Simulate buffered output where both objects appear on one line - buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) + # Simulate buffered output where both objects appear on one line + buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) - # Create transport - transport = SubprocessCLITransport( - prompt="test", - options=ClaudeCodeOptions(), - cli_path="/usr/bin/claude" - ) + # Create transport + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeCodeOptions(), + cli_path="/usr/bin/claude" + ) - # Mock the process and streams - mock_process = MagicMock() - mock_process.returncode = None - mock_process.wait = AsyncMock(return_value=None) - transport._process = mock_process + # Mock the process and streams + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process - # Create mock stream that returns the buffered line - transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] - transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] + # Create mock stream that returns the buffered line + transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] + transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] - # Collect all messages - messages: list[Any] = [] - async for msg in transport.receive_messages(): - messages.append(msg) + # Collect all messages + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) - # Verify both JSON objects were successfully parsed - assert len(messages) == 2 - assert messages[0]["type"] == "message" - assert messages[0]["id"] == "msg1" - assert messages[0]["content"] == "First message" - assert messages[1]["type"] == "result" - assert messages[1]["id"] == "res1" - assert messages[1]["status"] == "completed" + # Verify both JSON objects were successfully parsed + assert len(messages) == 2 + assert messages[0]["type"] == "message" + assert messages[0]["id"] == "msg1" + assert messages[0]["content"] == "First message" + assert messages[1]["type"] == "result" + assert messages[1]["id"] == "res1" + assert messages[1]["status"] == "completed" + + anyio.run(_test) + + def test_json_with_embedded_newlines(self) -> None: + """Test parsing JSON objects that contain newline characters in string values.""" + async def _test() -> None: + # JSON objects with newlines in string values + json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"} + json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"} + + buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) + + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeCodeOptions(), + cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([buffered_line]) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 2 + assert messages[0]["content"] == "Line 1\nLine 2\nLine 3" + assert messages[1]["data"] == "Some\nMultiline\nContent" + + anyio.run(_test) + + def test_multiple_newlines_between_objects(self) -> None: + """Test parsing with multiple newlines between JSON objects.""" + async def _test() -> None: + json_obj1 = {"type": "message", "id": "msg1"} + json_obj2 = {"type": "result", "id": "res1"} + + # Multiple newlines between objects + buffered_line = json.dumps(json_obj1) + '\n\n\n' + json.dumps(json_obj2) + + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeCodeOptions(), + cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([buffered_line]) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 2 + assert messages[0]["id"] == "msg1" + assert messages[1]["id"] == "res1" + + anyio.run(_test) From ecad93a0c01c0ec8079f13e9b00292e38a306ca2 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Fri, 27 Jun 2025 15:04:20 -0700 Subject: [PATCH 3/4] Apply ruff linting fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fixed formatting in test_subprocess_buffering.py - No functional changes, only code style improvements 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- tests/test_subprocess_buffering.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 4e26e90..9642808 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -85,26 +85,26 @@ class TestSubprocessBuffering: # JSON objects with newlines in string values json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"} json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"} - + buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) - + transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) - + mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([buffered_line]) transport._stderr_stream = MockTextReceiveStream([]) - + messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) - + assert len(messages) == 2 assert messages[0]["content"] == "Line 1\nLine 2\nLine 3" assert messages[1]["data"] == "Some\nMultiline\nContent" @@ -116,27 +116,27 @@ class TestSubprocessBuffering: async def _test() -> None: json_obj1 = {"type": "message", "id": "msg1"} json_obj2 = {"type": "result", "id": "res1"} - + # Multiple newlines between objects buffered_line = json.dumps(json_obj1) + '\n\n\n' + json.dumps(json_obj2) - + transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) - + mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process transport._stdout_stream = MockTextReceiveStream([buffered_line]) transport._stderr_stream = MockTextReceiveStream([]) - + messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) - + assert len(messages) == 2 assert messages[0]["id"] == "msg1" assert messages[1]["id"] == "res1" From 761079180323ec9077aaa5f841b346cfc7b0d65a Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Fri, 27 Jun 2025 15:08:20 -0700 Subject: [PATCH 4/4] Apply ruff formatting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fixed quotes to use double quotes consistently - Adjusted line breaks per ruff formatter rules - No functional changes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- tests/test_subprocess_buffering.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 9642808..06886df 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -38,19 +38,18 @@ class TestSubprocessBuffering: In some environments, stdout buffering can cause multiple distinct JSON objects to be delivered as a single line with embedded newlines. """ + async def _test() -> None: # Two valid JSON objects separated by a newline character json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} json_obj2 = {"type": "result", "id": "res1", "status": "completed"} # Simulate buffered output where both objects appear on one line - buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) + buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) # Create transport transport = SubprocessCLITransport( - prompt="test", - options=ClaudeCodeOptions(), - cli_path="/usr/bin/claude" + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) # Mock the process and streams @@ -81,17 +80,16 @@ class TestSubprocessBuffering: def test_json_with_embedded_newlines(self) -> None: """Test parsing JSON objects that contain newline characters in string values.""" + async def _test() -> None: # JSON objects with newlines in string values json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"} json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"} - buffered_line = json.dumps(json_obj1) + '\n' + json.dumps(json_obj2) + buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( - prompt="test", - options=ClaudeCodeOptions(), - cli_path="/usr/bin/claude" + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock() @@ -113,17 +111,16 @@ class TestSubprocessBuffering: def test_multiple_newlines_between_objects(self) -> None: """Test parsing with multiple newlines between JSON objects.""" + async def _test() -> None: json_obj1 = {"type": "message", "id": "msg1"} json_obj2 = {"type": "result", "id": "res1"} # Multiple newlines between objects - buffered_line = json.dumps(json_obj1) + '\n\n\n' + json.dumps(json_obj2) + buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( - prompt="test", - options=ClaudeCodeOptions(), - cli_path="/usr/bin/claude" + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) mock_process = MagicMock()