From 24295417ac9028258e151d41310f1a6b3d8573da Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Mon, 30 Jun 2025 21:55:06 -0700 Subject: [PATCH 1/5] fix: handle JSON messages split across multiple stream reads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds buffering to accumulate incomplete JSON messages that are split across multiple stream reads due to asyncio's 64KB default buffer limit. - Implement 1MB buffer to accumulate partial JSON - Clear buffer on successful parse or size limit exceeded - Add comprehensive tests for split JSON scenarios Fixes CLIJSONDecodeError when reading large minified JSON files. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../_internal/transport/subprocess_cli.py | 24 ++- tests/test_subprocess_buffering.py | 178 +++++++++++++++++- 2 files changed, 197 insertions(+), 5 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index d403ced..6d76f38 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -17,6 +17,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ...types import ClaudeCodeOptions from . import Transport +_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit + class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI.""" @@ -182,6 +184,9 @@ class SubprocessCLITransport(Transport): async with anyio.create_task_group() as tg: tg.start_soon(read_stderr) + # Buffer for incomplete JSON + json_buffer = "" + try: async for line in self._stdout_stream: line_str = line.strip() @@ -196,16 +201,27 @@ class SubprocessCLITransport(Transport): if not json_line: continue + # Add to buffer + json_buffer += json_line + + # Check buffer size + if len(json_buffer) > _MAX_BUFFER_SIZE: + json_buffer = "" # Clear buffer to prevent repeated errors + raise SDKJSONDecodeError( + f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", + None + ) + try: - data = json.loads(json_line) + data = json.loads(json_buffer) + json_buffer = "" # Clear buffer on successful parse try: yield data except GeneratorExit: # Handle generator cleanup gracefully return - except json.JSONDecodeError as e: - if json_line.startswith("{") or json_line.startswith("["): - raise SDKJSONDecodeError(json_line, e) from e + except json.JSONDecodeError: + # Continue accumulating in buffer continue except anyio.ClosedResourceError: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 06886df..4e6dab3 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -6,8 +6,13 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock import anyio +import pytest -from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport +from claude_code_sdk._errors import CLIJSONDecodeError +from claude_code_sdk._internal.transport.subprocess_cli import ( + _MAX_BUFFER_SIZE, + SubprocessCLITransport, +) from claude_code_sdk.types import ClaudeCodeOptions @@ -139,3 +144,174 @@ class TestSubprocessBuffering: assert messages[1]["id"] == "res1" anyio.run(_test) + + def test_split_json_across_multiple_reads(self) -> None: + """Test parsing when a single JSON object is split across multiple stream reads.""" + + async def _test() -> None: + # Large JSON object that simulates being split + json_obj = { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "x" * 1000}, + {"type": "tool_use", "id": "tool_123", "name": "Read", "input": {"file_path": "/test.txt"}} + ] + } + } + + complete_json = json.dumps(json_obj) + + # Split at arbitrary points to simulate stream chunking + part1 = complete_json[:100] + part2 = complete_json[100:250] + part3 = complete_json[250:] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([part1, part2, part3]) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # Should reconstruct the complete JSON + assert len(messages) == 1 + assert messages[0]["type"] == "assistant" + assert len(messages[0]["message"]["content"]) == 2 + + anyio.run(_test) + + def test_large_minified_json(self) -> None: + """Test parsing a large minified JSON (simulating the reported issue).""" + + async def _test() -> None: + # Create a large minified JSON similar to what caused the issue + large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]} + json_obj = { + "type": "user", + "message": { + "role": "user", + "content": [ + { + "tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj", + "type": "tool_result", + "content": json.dumps(large_data) + } + ] + } + } + + complete_json = json.dumps(json_obj) + + # Split into chunks simulating 64KB buffer limit + chunk_size = 64 * 1024 # 64KB + chunks = [complete_json[i:i+chunk_size] for i in range(0, len(complete_json), chunk_size)] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(chunks) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 1 + assert messages[0]["type"] == "user" + assert messages[0]["message"]["content"][0]["tool_use_id"] == "toolu_016fed1NhiaMLqnEvrj5NUaj" + + anyio.run(_test) + + def test_buffer_size_exceeded(self) -> None: + """Test that exceeding buffer size raises an appropriate error.""" + + async def _test() -> None: + # Create incomplete JSON larger than buffer limit + huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000) + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([huge_incomplete]) + transport._stderr_stream = MockTextReceiveStream([]) + + with pytest.raises(Exception) as exc_info: + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + # The exception is wrapped in ExceptionGroup by anyio + assert len(exc_info.value.exceptions) == 1 + assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) + assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0]) + + anyio.run(_test) + + def test_mixed_complete_and_split_json(self) -> None: + """Test handling a mix of complete and split JSON messages.""" + + async def _test() -> None: + # First: complete JSON + msg1 = json.dumps({"type": "system", "subtype": "start"}) + + # Second: large JSON split across reads + large_msg = { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "y" * 5000}]} + } + large_json = json.dumps(large_msg) + + # Third: another complete JSON + msg3 = json.dumps({"type": "system", "subtype": "end"}) + + # Simulate streaming with mixed complete and partial messages + lines = [ + msg1 + "\n", + large_json[:1000], # First part of large message + large_json[1000:3000], # Middle part + large_json[3000:] + "\n" + msg3 # End of large message + complete message + ] + + transport = SubprocessCLITransport( + prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.receive_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["subtype"] == "start" + assert messages[1]["type"] == "assistant" + assert len(messages[1]["message"]["content"][0]["text"]) == 5000 + assert messages[2]["type"] == "system" + assert messages[2]["subtype"] == "end" + + anyio.run(_test) From 977bf6438bd0226ae309a841d62c3b24e64f1740 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Mon, 30 Jun 2025 21:58:37 -0700 Subject: [PATCH 2/5] style: apply ruff formatting --- .../_internal/transport/subprocess_cli.py | 2 +- tests/test_subprocess_buffering.py | 31 +++++++++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index 6d76f38..530b5c5 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -209,7 +209,7 @@ class SubprocessCLITransport(Transport): json_buffer = "" # Clear buffer to prevent repeated errors raise SDKJSONDecodeError( f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", - None + None, ) try: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 4e6dab3..a9e43cc 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -155,9 +155,14 @@ class TestSubprocessBuffering: "message": { "content": [ {"type": "text", "text": "x" * 1000}, - {"type": "tool_use", "id": "tool_123", "name": "Read", "input": {"file_path": "/test.txt"}} + { + "type": "tool_use", + "id": "tool_123", + "name": "Read", + "input": {"file_path": "/test.txt"}, + }, ] - } + }, } complete_json = json.dumps(json_obj) @@ -203,17 +208,20 @@ class TestSubprocessBuffering: { "tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj", "type": "tool_result", - "content": json.dumps(large_data) + "content": json.dumps(large_data), } - ] - } + ], + }, } complete_json = json.dumps(json_obj) # Split into chunks simulating 64KB buffer limit chunk_size = 64 * 1024 # 64KB - chunks = [complete_json[i:i+chunk_size] for i in range(0, len(complete_json), chunk_size)] + chunks = [ + complete_json[i : i + chunk_size] + for i in range(0, len(complete_json), chunk_size) + ] transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" @@ -232,7 +240,10 @@ class TestSubprocessBuffering: assert len(messages) == 1 assert messages[0]["type"] == "user" - assert messages[0]["message"]["content"][0]["tool_use_id"] == "toolu_016fed1NhiaMLqnEvrj5NUaj" + assert ( + messages[0]["message"]["content"][0]["tool_use_id"] + == "toolu_016fed1NhiaMLqnEvrj5NUaj" + ) anyio.run(_test) @@ -276,7 +287,7 @@ class TestSubprocessBuffering: # Second: large JSON split across reads large_msg = { "type": "assistant", - "message": {"content": [{"type": "text", "text": "y" * 5000}]} + "message": {"content": [{"type": "text", "text": "y" * 5000}]}, } large_json = json.dumps(large_msg) @@ -288,7 +299,9 @@ class TestSubprocessBuffering: msg1 + "\n", large_json[:1000], # First part of large message large_json[1000:3000], # Middle part - large_json[3000:] + "\n" + msg3 # End of large message + complete message + large_json[3000:] + + "\n" + + msg3, # End of large message + complete message ] transport = SubprocessCLITransport( From 3ab62b617d714325d375f8ced8253b4ecadc34bc Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Mon, 30 Jun 2025 22:02:02 -0700 Subject: [PATCH 3/5] fix: Pass proper Exception to CLIJSONDecodeError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CLIJSONDecodeError constructor expects an Exception as the second argument, not None. Changed to pass a ValueError with details about the buffer size limit being exceeded. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/claude_code_sdk/_internal/transport/subprocess_cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index 530b5c5..c1223d2 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -209,7 +209,9 @@ class SubprocessCLITransport(Transport): json_buffer = "" # Clear buffer to prevent repeated errors raise SDKJSONDecodeError( f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", - None, + ValueError( + f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}" + ), ) try: From 1791031d20dadff68b5054ba2de85000d50e98af Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Mon, 30 Jun 2025 22:27:48 -0700 Subject: [PATCH 4/5] chore: Remove obvious comments from code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removed redundant comments that simply restate what the code is doing. Kept only comments that provide valuable context or explain complex behavior. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../_internal/transport/subprocess_cli.py | 10 ++----- tests/test_subprocess_buffering.py | 30 +++---------------- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index c1223d2..982a85a 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -184,7 +184,6 @@ class SubprocessCLITransport(Transport): async with anyio.create_task_group() as tg: tg.start_soon(read_stderr) - # Buffer for incomplete JSON json_buffer = "" try: @@ -193,7 +192,6 @@ class SubprocessCLITransport(Transport): if not line_str: continue - # Split on newlines in case multiple JSON objects are buffered together json_lines = line_str.split("\n") for json_line in json_lines: @@ -201,12 +199,10 @@ class SubprocessCLITransport(Transport): if not json_line: continue - # Add to buffer json_buffer += json_line - # Check buffer size if len(json_buffer) > _MAX_BUFFER_SIZE: - json_buffer = "" # Clear buffer to prevent repeated errors + json_buffer = "" raise SDKJSONDecodeError( f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", ValueError( @@ -216,14 +212,12 @@ class SubprocessCLITransport(Transport): try: data = json.loads(json_buffer) - json_buffer = "" # Clear buffer on successful parse + json_buffer = "" try: yield data except GeneratorExit: - # Handle generator cleanup gracefully return except json.JSONDecodeError: - # Continue accumulating in buffer continue except anyio.ClosedResourceError: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index a9e43cc..3f9b0be 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -45,34 +45,27 @@ class TestSubprocessBuffering: """ async def _test() -> None: - # Two valid JSON objects separated by a newline character json_obj1 = {"type": "message", "id": "msg1", "content": "First message"} json_obj2 = {"type": "result", "id": "res1", "status": "completed"} - # Simulate buffered output where both objects appear on one line buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2) - # Create transport transport = SubprocessCLITransport( prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude" ) - # Mock the process and streams mock_process = MagicMock() mock_process.returncode = None mock_process.wait = AsyncMock(return_value=None) transport._process = mock_process - # Create mock stream that returns the buffered line transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment] transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment] - # Collect all messages messages: list[Any] = [] async for msg in transport.receive_messages(): messages.append(msg) - # Verify both JSON objects were successfully parsed assert len(messages) == 2 assert messages[0]["type"] == "message" assert messages[0]["id"] == "msg1" @@ -87,7 +80,6 @@ class TestSubprocessBuffering: """Test parsing JSON objects that contain newline characters in string values.""" async def _test() -> None: - # JSON objects with newlines in string values json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"} json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"} @@ -121,7 +113,6 @@ class TestSubprocessBuffering: json_obj1 = {"type": "message", "id": "msg1"} json_obj2 = {"type": "result", "id": "res1"} - # Multiple newlines between objects buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2) transport = SubprocessCLITransport( @@ -149,7 +140,6 @@ class TestSubprocessBuffering: """Test parsing when a single JSON object is split across multiple stream reads.""" async def _test() -> None: - # Large JSON object that simulates being split json_obj = { "type": "assistant", "message": { @@ -167,7 +157,6 @@ class TestSubprocessBuffering: complete_json = json.dumps(json_obj) - # Split at arbitrary points to simulate stream chunking part1 = complete_json[:100] part2 = complete_json[100:250] part3 = complete_json[250:] @@ -187,7 +176,6 @@ class TestSubprocessBuffering: async for msg in transport.receive_messages(): messages.append(msg) - # Should reconstruct the complete JSON assert len(messages) == 1 assert messages[0]["type"] == "assistant" assert len(messages[0]["message"]["content"]) == 2 @@ -198,7 +186,6 @@ class TestSubprocessBuffering: """Test parsing a large minified JSON (simulating the reported issue).""" async def _test() -> None: - # Create a large minified JSON similar to what caused the issue large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]} json_obj = { "type": "user", @@ -216,8 +203,7 @@ class TestSubprocessBuffering: complete_json = json.dumps(json_obj) - # Split into chunks simulating 64KB buffer limit - chunk_size = 64 * 1024 # 64KB + chunk_size = 64 * 1024 chunks = [ complete_json[i : i + chunk_size] for i in range(0, len(complete_json), chunk_size) @@ -251,7 +237,6 @@ class TestSubprocessBuffering: """Test that exceeding buffer size raises an appropriate error.""" async def _test() -> None: - # Create incomplete JSON larger than buffer limit huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000) transport = SubprocessCLITransport( @@ -270,7 +255,6 @@ class TestSubprocessBuffering: async for msg in transport.receive_messages(): messages.append(msg) - # The exception is wrapped in ExceptionGroup by anyio assert len(exc_info.value.exceptions) == 1 assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0]) @@ -281,27 +265,21 @@ class TestSubprocessBuffering: """Test handling a mix of complete and split JSON messages.""" async def _test() -> None: - # First: complete JSON msg1 = json.dumps({"type": "system", "subtype": "start"}) - # Second: large JSON split across reads large_msg = { "type": "assistant", "message": {"content": [{"type": "text", "text": "y" * 5000}]}, } large_json = json.dumps(large_msg) - # Third: another complete JSON msg3 = json.dumps({"type": "system", "subtype": "end"}) - # Simulate streaming with mixed complete and partial messages lines = [ msg1 + "\n", - large_json[:1000], # First part of large message - large_json[1000:3000], # Middle part - large_json[3000:] - + "\n" - + msg3, # End of large message + complete message + large_json[:1000], + large_json[1000:3000], + large_json[3000:] + "\n" + msg3, ] transport = SubprocessCLITransport( From 82335331487c97f81f6a513eec2d72658f2d9945 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Mon, 30 Jun 2025 23:00:54 -0700 Subject: [PATCH 5/5] Add comment explaining JSON buffer accumulation logic --- src/claude_code_sdk/_internal/transport/subprocess_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index 982a85a..288b2e7 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -199,6 +199,7 @@ class SubprocessCLITransport(Transport): if not json_line: continue + # Keep accumulating partial JSON until we can parse it json_buffer += json_line if len(json_buffer) > _MAX_BUFFER_SIZE: