From 4460dfbe3b7efefd35015b3c23d8be0a69bc87c5 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Tue, 1 Jul 2025 16:22:49 -0700 Subject: [PATCH] Improve FastAPI SSE streaming fix with safety limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add critical safety mechanisms to the sequential stderr reading approach: - 10MB memory limit on stderr collection with truncation message - 30 second timeout protection to prevent hanging - Proper exit code based error detection instead of string matching - Debug logging for stderr on successful runs - Better exception handling for edge cases This addresses potential production issues while maintaining FastAPI compatibility. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../_internal/transport/subprocess_cli.py | 66 +++++++++++++++---- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index ab7de7e..8c70691 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -1,6 +1,7 @@ """Subprocess transport implementation using Claude Code CLI.""" import json +import logging import os import shutil from collections.abc import AsyncIterator @@ -17,6 +18,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ...types import ClaudeCodeOptions from . import Transport +logger = logging.getLogger(__name__) + _MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit @@ -170,6 +173,10 @@ class SubprocessCLITransport(Transport): if not self._process or not self._stdout_stream: raise CLIConnectionError("Not connected") + # Safety constants + MAX_STDERR_SIZE = 10 * 1024 * 1024 # 10MB + STDERR_TIMEOUT = 30.0 # 30 seconds + json_buffer = "" # Process stdout messages first @@ -210,25 +217,60 @@ class SubprocessCLITransport(Transport): except anyio.ClosedResourceError: pass + except GeneratorExit: + # Client disconnected - still need to clean up + pass - # Read stderr after stdout completes (no concurrent task group) + # Process stderr with safety limits stderr_lines = [] + stderr_size = 0 + if self._stderr_stream: try: - async for line in self._stderr_stream: - stderr_lines.append(line.strip()) + # Use timeout to prevent hanging + with anyio.fail_after(STDERR_TIMEOUT): + async for line in self._stderr_stream: + line_text = line.strip() + line_size = len(line_text) + + # Enforce memory limit + if stderr_size + line_size > MAX_STDERR_SIZE: + stderr_lines.append( + f"[stderr truncated after {stderr_size} bytes]" + ) + # Drain rest of stream without storing + async for _ in self._stderr_stream: + pass + break + + stderr_lines.append(line_text) + stderr_size += line_size + + except TimeoutError: + stderr_lines.append( + f"[stderr collection timed out after {STDERR_TIMEOUT}s]" + ) except anyio.ClosedResourceError: pass - await self._process.wait() - if self._process.returncode is not None and self._process.returncode != 0: - stderr_output = "\n".join(stderr_lines) - if stderr_output and "error" in stderr_output.lower(): - raise ProcessError( - "CLI process failed", - exit_code=self._process.returncode, - stderr=stderr_output, - ) + # Check process completion and handle errors + try: + returncode = await self._process.wait() + except Exception: + returncode = -1 + + stderr_output = "\n".join(stderr_lines) if stderr_lines else "" + + # Use exit code for error detection, not string matching + if returncode != 0: + raise ProcessError( + f"Command failed with exit code {returncode}", + exit_code=returncode, + stderr=stderr_output, + ) + elif stderr_output: + # Log stderr for debugging but don't fail on non-zero exit + logger.debug(f"Process stderr: {stderr_output}") def is_connected(self) -> bool: """Check if subprocess is running."""