Improve FastAPI SSE streaming fix with safety limits

Add critical safety mechanisms to the sequential stderr reading approach:
- 10MB memory limit on stderr collection with truncation message
- 30 second timeout protection to prevent hanging
- Proper exit code based error detection instead of string matching
- Debug logging for stderr on successful runs
- Better exception handling for edge cases

This addresses potential production issues while maintaining FastAPI compatibility.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Lina Tawfik 2025-07-01 16:22:49 -07:00
parent 67193350e2
commit 4460dfbe3b
No known key found for this signature in database

View file

@ -1,6 +1,7 @@
"""Subprocess transport implementation using Claude Code CLI.""" """Subprocess transport implementation using Claude Code CLI."""
import json import json
import logging
import os import os
import shutil import shutil
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
@ -17,6 +18,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions from ...types import ClaudeCodeOptions
from . import Transport from . import Transport
logger = logging.getLogger(__name__)
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit _MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
@ -170,6 +173,10 @@ class SubprocessCLITransport(Transport):
if not self._process or not self._stdout_stream: if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected") raise CLIConnectionError("Not connected")
# Safety constants
MAX_STDERR_SIZE = 10 * 1024 * 1024 # 10MB
STDERR_TIMEOUT = 30.0 # 30 seconds
json_buffer = "" json_buffer = ""
# Process stdout messages first # Process stdout messages first
@ -210,25 +217,60 @@ class SubprocessCLITransport(Transport):
except anyio.ClosedResourceError: except anyio.ClosedResourceError:
pass pass
except GeneratorExit:
# Client disconnected - still need to clean up
pass
# Read stderr after stdout completes (no concurrent task group) # Process stderr with safety limits
stderr_lines = [] stderr_lines = []
stderr_size = 0
if self._stderr_stream: if self._stderr_stream:
try: try:
async for line in self._stderr_stream: # Use timeout to prevent hanging
stderr_lines.append(line.strip()) with anyio.fail_after(STDERR_TIMEOUT):
async for line in self._stderr_stream:
line_text = line.strip()
line_size = len(line_text)
# Enforce memory limit
if stderr_size + line_size > MAX_STDERR_SIZE:
stderr_lines.append(
f"[stderr truncated after {stderr_size} bytes]"
)
# Drain rest of stream without storing
async for _ in self._stderr_stream:
pass
break
stderr_lines.append(line_text)
stderr_size += line_size
except TimeoutError:
stderr_lines.append(
f"[stderr collection timed out after {STDERR_TIMEOUT}s]"
)
except anyio.ClosedResourceError: except anyio.ClosedResourceError:
pass pass
await self._process.wait() # Check process completion and handle errors
if self._process.returncode is not None and self._process.returncode != 0: try:
stderr_output = "\n".join(stderr_lines) returncode = await self._process.wait()
if stderr_output and "error" in stderr_output.lower(): except Exception:
raise ProcessError( returncode = -1
"CLI process failed",
exit_code=self._process.returncode, stderr_output = "\n".join(stderr_lines) if stderr_lines else ""
stderr=stderr_output,
) # Use exit code for error detection, not string matching
if returncode != 0:
raise ProcessError(
f"Command failed with exit code {returncode}",
exit_code=returncode,
stderr=stderr_output,
)
elif stderr_output:
# Log stderr for debugging but don't fail on non-zero exit
logger.debug(f"Process stderr: {stderr_output}")
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""Check if subprocess is running.""" """Check if subprocess is running."""