From 404c50bce0fa6917a2988ad200f5465e26ea481b Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Tue, 1 Jul 2025 00:04:50 -0700 Subject: [PATCH 1/5] Fix FastAPI SSE streaming compatibility (fixes #4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove anyio.create_task_group() from receive_messages() to fix the RuntimeError "Attempted to exit cancel scope in a different task than it was entered in" when using the SDK with FastAPI's SSE streaming. The issue occurred because FastAPI can move async generators between different asyncio tasks during the streaming lifecycle, which conflicts with anyio's cancel scope tracking. Changes: - Remove task group usage from receive_messages() - Read stderr sequentially after stdout completes - Add test to ensure no task groups are used - Fix existing test expectation for buffer overflow This is a minimal fix that maintains compatibility while solving the core issue. The trade-off is that stderr is read after stdout instead of concurrently, but this is unlikely to cause issues in practice. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../_internal/transport/subprocess_cli.py | 82 +++++++++---------- tests/test_fastapi_streaming_compatibility.py | 25 ++++++ tests/test_subprocess_buffering.py | 5 +- 3 files changed, 66 insertions(+), 46 deletions(-) create mode 100644 tests/test_fastapi_streaming_compatibility.py diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index 288b2e7..c6da949 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -170,57 +170,53 @@ class SubprocessCLITransport(Transport): if not self._process or not self._stdout_stream: raise CLIConnectionError("Not connected") - stderr_lines = [] + json_buffer = "" + + # Process stdout messages first + try: + async for line in self._stdout_stream: + line_str = line.strip() + if not line_str: + continue - async def read_stderr() -> None: - """Read stderr in background.""" - if self._stderr_stream: - try: - async for line in self._stderr_stream: - stderr_lines.append(line.strip()) - except anyio.ClosedResourceError: - pass + json_lines = line_str.split("\n") - async with anyio.create_task_group() as tg: - tg.start_soon(read_stderr) - - json_buffer = "" - - try: - async for line in self._stdout_stream: - line_str = line.strip() - if not line_str: + for json_line in json_lines: + json_line = json_line.strip() + if not json_line: continue - json_lines = line_str.split("\n") + # Keep accumulating partial JSON until we can parse it + json_buffer += json_line - for json_line in json_lines: - json_line = json_line.strip() - if not json_line: - continue - - # Keep accumulating partial JSON until we can parse it - json_buffer += json_line - - if len(json_buffer) > _MAX_BUFFER_SIZE: - json_buffer = "" - raise SDKJSONDecodeError( - f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", - ValueError( - f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}" - ), - ) + if len(json_buffer) > _MAX_BUFFER_SIZE: + json_buffer = "" + raise SDKJSONDecodeError( + f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes", + ValueError( + f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}" + ), + ) + try: + data = json.loads(json_buffer) + json_buffer = "" try: - data = json.loads(json_buffer) - json_buffer = "" - try: - yield data - except GeneratorExit: - return - except json.JSONDecodeError: - continue + yield data + except GeneratorExit: + return + except json.JSONDecodeError: + continue + except anyio.ClosedResourceError: + pass + + # Read stderr after stdout completes (no concurrent task group) + stderr_lines = [] + if self._stderr_stream: + try: + async for line in self._stderr_stream: + stderr_lines.append(line.strip()) except anyio.ClosedResourceError: pass diff --git a/tests/test_fastapi_streaming_compatibility.py b/tests/test_fastapi_streaming_compatibility.py new file mode 100644 index 0000000..47a678c --- /dev/null +++ b/tests/test_fastapi_streaming_compatibility.py @@ -0,0 +1,25 @@ +"""Test FastAPI streaming compatibility (issue #4 fix).""" + +import inspect + +from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport + + +def test_no_task_groups_in_receive_messages(): + """Verify receive_messages doesn't use task groups (fixes FastAPI issue #4).""" + # Get the source code of receive_messages + source = inspect.getsource(SubprocessCLITransport.receive_messages) + + # The fix: ensure no task group or task creation + assert "create_task_group" not in source, ( + "receive_messages must not use create_task_group to avoid " + "RuntimeError with FastAPI streaming" + ) + assert "asyncio.create_task" not in source, ( + "receive_messages must not create tasks to maintain " + "compatibility with FastAPI's generator handling" + ) + + # Verify stderr is still being read (sequential approach) + assert "_stderr_stream" in source, "Should still read stderr" + assert "stderr_lines" in source, "Should collect stderr output" \ No newline at end of file diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 3f9b0be..426d42e 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -255,9 +255,8 @@ class TestSubprocessBuffering: async for msg in transport.receive_messages(): messages.append(msg) - assert len(exc_info.value.exceptions) == 1 - assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) - assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0]) + assert isinstance(exc_info.value, CLIJSONDecodeError) + assert "exceeded maximum buffer size" in str(exc_info.value) anyio.run(_test) From 9ef9da9ab67981848fa5c017d1336ae0b18238fb Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Tue, 1 Jul 2025 15:12:26 -0700 Subject: [PATCH 2/5] Fix linting issues --- src/claude_code_sdk/_internal/transport/subprocess_cli.py | 2 +- tests/test_fastapi_streaming_compatibility.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index c6da949..ab7de7e 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -171,7 +171,7 @@ class SubprocessCLITransport(Transport): raise CLIConnectionError("Not connected") json_buffer = "" - + # Process stdout messages first try: async for line in self._stdout_stream: diff --git a/tests/test_fastapi_streaming_compatibility.py b/tests/test_fastapi_streaming_compatibility.py index 47a678c..20a6ad5 100644 --- a/tests/test_fastapi_streaming_compatibility.py +++ b/tests/test_fastapi_streaming_compatibility.py @@ -9,7 +9,7 @@ def test_no_task_groups_in_receive_messages(): """Verify receive_messages doesn't use task groups (fixes FastAPI issue #4).""" # Get the source code of receive_messages source = inspect.getsource(SubprocessCLITransport.receive_messages) - + # The fix: ensure no task group or task creation assert "create_task_group" not in source, ( "receive_messages must not use create_task_group to avoid " @@ -19,7 +19,7 @@ def test_no_task_groups_in_receive_messages(): "receive_messages must not create tasks to maintain " "compatibility with FastAPI's generator handling" ) - + # Verify stderr is still being read (sequential approach) assert "_stderr_stream" in source, "Should still read stderr" - assert "stderr_lines" in source, "Should collect stderr output" \ No newline at end of file + assert "stderr_lines" in source, "Should collect stderr output" From 67193350e25a773ed7ca13dbabd98c8a6c08eb85 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Tue, 1 Jul 2025 15:23:55 -0700 Subject: [PATCH 3/5] Remove implementation-specific test The test was checking source code for specific strings rather than testing behavior. The existing subprocess tests already verify the functionality works correctly. --- tests/test_fastapi_streaming_compatibility.py | 25 ------------------- 1 file changed, 25 deletions(-) delete mode 100644 tests/test_fastapi_streaming_compatibility.py diff --git a/tests/test_fastapi_streaming_compatibility.py b/tests/test_fastapi_streaming_compatibility.py deleted file mode 100644 index 20a6ad5..0000000 --- a/tests/test_fastapi_streaming_compatibility.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Test FastAPI streaming compatibility (issue #4 fix).""" - -import inspect - -from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport - - -def test_no_task_groups_in_receive_messages(): - """Verify receive_messages doesn't use task groups (fixes FastAPI issue #4).""" - # Get the source code of receive_messages - source = inspect.getsource(SubprocessCLITransport.receive_messages) - - # The fix: ensure no task group or task creation - assert "create_task_group" not in source, ( - "receive_messages must not use create_task_group to avoid " - "RuntimeError with FastAPI streaming" - ) - assert "asyncio.create_task" not in source, ( - "receive_messages must not create tasks to maintain " - "compatibility with FastAPI's generator handling" - ) - - # Verify stderr is still being read (sequential approach) - assert "_stderr_stream" in source, "Should still read stderr" - assert "stderr_lines" in source, "Should collect stderr output" From 4460dfbe3b7efefd35015b3c23d8be0a69bc87c5 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Tue, 1 Jul 2025 16:22:49 -0700 Subject: [PATCH 4/5] Improve FastAPI SSE streaming fix with safety limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add critical safety mechanisms to the sequential stderr reading approach: - 10MB memory limit on stderr collection with truncation message - 30 second timeout protection to prevent hanging - Proper exit code based error detection instead of string matching - Debug logging for stderr on successful runs - Better exception handling for edge cases This addresses potential production issues while maintaining FastAPI compatibility. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../_internal/transport/subprocess_cli.py | 66 +++++++++++++++---- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index ab7de7e..8c70691 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -1,6 +1,7 @@ """Subprocess transport implementation using Claude Code CLI.""" import json +import logging import os import shutil from collections.abc import AsyncIterator @@ -17,6 +18,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ...types import ClaudeCodeOptions from . import Transport +logger = logging.getLogger(__name__) + _MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit @@ -170,6 +173,10 @@ class SubprocessCLITransport(Transport): if not self._process or not self._stdout_stream: raise CLIConnectionError("Not connected") + # Safety constants + MAX_STDERR_SIZE = 10 * 1024 * 1024 # 10MB + STDERR_TIMEOUT = 30.0 # 30 seconds + json_buffer = "" # Process stdout messages first @@ -210,25 +217,60 @@ class SubprocessCLITransport(Transport): except anyio.ClosedResourceError: pass + except GeneratorExit: + # Client disconnected - still need to clean up + pass - # Read stderr after stdout completes (no concurrent task group) + # Process stderr with safety limits stderr_lines = [] + stderr_size = 0 + if self._stderr_stream: try: - async for line in self._stderr_stream: - stderr_lines.append(line.strip()) + # Use timeout to prevent hanging + with anyio.fail_after(STDERR_TIMEOUT): + async for line in self._stderr_stream: + line_text = line.strip() + line_size = len(line_text) + + # Enforce memory limit + if stderr_size + line_size > MAX_STDERR_SIZE: + stderr_lines.append( + f"[stderr truncated after {stderr_size} bytes]" + ) + # Drain rest of stream without storing + async for _ in self._stderr_stream: + pass + break + + stderr_lines.append(line_text) + stderr_size += line_size + + except TimeoutError: + stderr_lines.append( + f"[stderr collection timed out after {STDERR_TIMEOUT}s]" + ) except anyio.ClosedResourceError: pass - await self._process.wait() - if self._process.returncode is not None and self._process.returncode != 0: - stderr_output = "\n".join(stderr_lines) - if stderr_output and "error" in stderr_output.lower(): - raise ProcessError( - "CLI process failed", - exit_code=self._process.returncode, - stderr=stderr_output, - ) + # Check process completion and handle errors + try: + returncode = await self._process.wait() + except Exception: + returncode = -1 + + stderr_output = "\n".join(stderr_lines) if stderr_lines else "" + + # Use exit code for error detection, not string matching + if returncode != 0: + raise ProcessError( + f"Command failed with exit code {returncode}", + exit_code=returncode, + stderr=stderr_output, + ) + elif stderr_output: + # Log stderr for debugging but don't fail on non-zero exit + logger.debug(f"Process stderr: {stderr_output}") def is_connected(self) -> bool: """Check if subprocess is running.""" From be040c6161af230fbe9c6d127e36add385f1ae70 Mon Sep 17 00:00:00 2001 From: Lina Tawfik Date: Tue, 1 Jul 2025 16:27:32 -0700 Subject: [PATCH 5/5] Fix linting issues - use lowercase variable names --- .../_internal/transport/subprocess_cli.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index 8c70691..f4fbc58 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -174,8 +174,8 @@ class SubprocessCLITransport(Transport): raise CLIConnectionError("Not connected") # Safety constants - MAX_STDERR_SIZE = 10 * 1024 * 1024 # 10MB - STDERR_TIMEOUT = 30.0 # 30 seconds + max_stderr_size = 10 * 1024 * 1024 # 10MB + stderr_timeout = 30.0 # 30 seconds json_buffer = "" @@ -228,13 +228,13 @@ class SubprocessCLITransport(Transport): if self._stderr_stream: try: # Use timeout to prevent hanging - with anyio.fail_after(STDERR_TIMEOUT): + with anyio.fail_after(stderr_timeout): async for line in self._stderr_stream: line_text = line.strip() line_size = len(line_text) # Enforce memory limit - if stderr_size + line_size > MAX_STDERR_SIZE: + if stderr_size + line_size > max_stderr_size: stderr_lines.append( f"[stderr truncated after {stderr_size} bytes]" ) @@ -248,7 +248,7 @@ class SubprocessCLITransport(Transport): except TimeoutError: stderr_lines.append( - f"[stderr collection timed out after {STDERR_TIMEOUT}s]" + f"[stderr collection timed out after {stderr_timeout}s]" ) except anyio.ClosedResourceError: pass @@ -262,7 +262,7 @@ class SubprocessCLITransport(Transport): stderr_output = "\n".join(stderr_lines) if stderr_lines else "" # Use exit code for error detection, not string matching - if returncode != 0: + if returncode is not None and returncode != 0: raise ProcessError( f"Command failed with exit code {returncode}", exit_code=returncode,