diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index 567b32d..91e740c 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -24,7 +24,7 @@ from . import Transport logger = logging.getLogger(__name__) -_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit +_DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit class SubprocessCLITransport(Transport): @@ -48,6 +48,11 @@ class SubprocessCLITransport(Transport): self._stderr_task_group: anyio.abc.TaskGroup | None = None self._ready = False self._exit_error: Exception | None = None # Track process exit errors + self._max_buffer_size = ( + options.max_buffer_size + if options.max_buffer_size is not None + else _DEFAULT_MAX_BUFFER_SIZE + ) def _find_cli(self) -> str: """Find Claude Code CLI binary.""" @@ -402,12 +407,13 @@ class SubprocessCLITransport(Transport): # Keep accumulating partial JSON until we can parse it json_buffer += json_line - if len(json_buffer) > _MAX_BUFFER_SIZE: + if len(json_buffer) > self._max_buffer_size: + buffer_length = len(json_buffer) json_buffer = "" raise SDKJSONDecodeError( - f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", + f"JSON message exceeded maximum buffer size of {self._max_buffer_size} bytes", ValueError( - f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}" + f"Buffer size {buffer_length} exceeds limit {self._max_buffer_size}" ), ) @@ -418,7 +424,7 @@ class SubprocessCLITransport(Transport): except json.JSONDecodeError: # We are speculatively decoding the buffer until we get # a full JSON object. If there is an actual issue, we - # raise an error after _MAX_BUFFER_SIZE. + # raise an error after exceeding the configured limit. continue except anyio.ClosedResourceError: diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 69c16e7..862606e 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -320,6 +320,7 @@ class ClaudeAgentOptions: extra_args: dict[str, str | None] = field( default_factory=dict ) # Pass arbitrary CLI flags + max_buffer_size: int | None = None # Max bytes when buffering CLI stdout debug_stderr: Any = ( sys.stderr ) # Deprecated: File-like object for debug output. Use stderr callback instead. diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 9c62557..9437e02 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -10,7 +10,7 @@ import pytest from claude_agent_sdk._errors import CLIJSONDecodeError from claude_agent_sdk._internal.transport.subprocess_cli import ( - _MAX_BUFFER_SIZE, + _DEFAULT_MAX_BUFFER_SIZE, SubprocessCLITransport, ) from claude_agent_sdk.types import ClaudeAgentOptions @@ -237,7 +237,7 @@ class TestSubprocessBuffering: """Test that exceeding buffer size raises an appropriate error.""" async def _test() -> None: - huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000) + huge_incomplete = '{"data": "' + "x" * (_DEFAULT_MAX_BUFFER_SIZE + 1000) transport = SubprocessCLITransport( prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude" @@ -260,6 +260,34 @@ class TestSubprocessBuffering: anyio.run(_test) + def test_buffer_size_option(self) -> None: + """Test that the configurable buffer size option is respected.""" + + async def _test() -> None: + custom_limit = 512 + huge_incomplete = '{"data": "' + "x" * (custom_limit + 10) + + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeAgentOptions(max_buffer_size=custom_limit), + cli_path="/usr/bin/claude", + ) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream([huge_incomplete]) + transport._stderr_stream = MockTextReceiveStream([]) + + with pytest.raises(CLIJSONDecodeError) as exc_info: + async for _ in transport.read_messages(): + pass + + assert f"maximum buffer size of {custom_limit} bytes" in str(exc_info.value) + + anyio.run(_test) + def test_mixed_complete_and_split_json(self) -> None: """Test handling a mix of complete and split JSON messages."""