Fix FastAPI SSE streaming compatibility (fixes #4)

Remove anyio.create_task_group() from receive_messages() to fix the
RuntimeError "Attempted to exit cancel scope in a different task than
it was entered in" when using the SDK with FastAPI's SSE streaming.

The issue occurred because FastAPI can move async generators between
different asyncio tasks during the streaming lifecycle, which conflicts
with anyio's cancel scope tracking.

Changes:
- Remove task group usage from receive_messages()
- Read stderr sequentially after stdout completes
- Add test to ensure no task groups are used
- Fix existing test expectation for buffer overflow

This is a minimal fix that maintains compatibility while solving the
core issue. The trade-off is that stderr is read after stdout instead
of concurrently, but this is unlikely to cause issues in practice.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Lina Tawfik 2025-07-01 00:04:50 -07:00
parent 4af210ee8f
commit 404c50bce0
No known key found for this signature in database
3 changed files with 66 additions and 46 deletions

View file

@ -170,57 +170,53 @@ class SubprocessCLITransport(Transport):
if not self._process or not self._stdout_stream: if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected") raise CLIConnectionError("Not connected")
stderr_lines = [] json_buffer = ""
# Process stdout messages first
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
async def read_stderr() -> None: json_lines = line_str.split("\n")
"""Read stderr in background."""
if self._stderr_stream:
try:
async for line in self._stderr_stream:
stderr_lines.append(line.strip())
except anyio.ClosedResourceError:
pass
async with anyio.create_task_group() as tg: for json_line in json_lines:
tg.start_soon(read_stderr) json_line = json_line.strip()
if not json_line:
json_buffer = ""
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue continue
json_lines = line_str.split("\n") # Keep accumulating partial JSON until we can parse it
json_buffer += json_line
for json_line in json_lines: if len(json_buffer) > _MAX_BUFFER_SIZE:
json_line = json_line.strip() json_buffer = ""
if not json_line: raise SDKJSONDecodeError(
continue f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
# Keep accumulating partial JSON until we can parse it f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
json_buffer += json_line ),
)
if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = ""
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
),
)
try:
data = json.loads(json_buffer)
json_buffer = ""
try: try:
data = json.loads(json_buffer) yield data
json_buffer = "" except GeneratorExit:
try: return
yield data except json.JSONDecodeError:
except GeneratorExit: continue
return
except json.JSONDecodeError:
continue
except anyio.ClosedResourceError:
pass
# Read stderr after stdout completes (no concurrent task group)
stderr_lines = []
if self._stderr_stream:
try:
async for line in self._stderr_stream:
stderr_lines.append(line.strip())
except anyio.ClosedResourceError: except anyio.ClosedResourceError:
pass pass

View file

@ -0,0 +1,25 @@
"""Test FastAPI streaming compatibility (issue #4 fix)."""
import inspect
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
def test_no_task_groups_in_receive_messages():
"""Verify receive_messages doesn't use task groups (fixes FastAPI issue #4)."""
# Get the source code of receive_messages
source = inspect.getsource(SubprocessCLITransport.receive_messages)
# The fix: ensure no task group or task creation
assert "create_task_group" not in source, (
"receive_messages must not use create_task_group to avoid "
"RuntimeError with FastAPI streaming"
)
assert "asyncio.create_task" not in source, (
"receive_messages must not create tasks to maintain "
"compatibility with FastAPI's generator handling"
)
# Verify stderr is still being read (sequential approach)
assert "_stderr_stream" in source, "Should still read stderr"
assert "stderr_lines" in source, "Should collect stderr output"

View file

@ -255,9 +255,8 @@ class TestSubprocessBuffering:
async for msg in transport.receive_messages(): async for msg in transport.receive_messages():
messages.append(msg) messages.append(msg)
assert len(exc_info.value.exceptions) == 1 assert isinstance(exc_info.value, CLIJSONDecodeError)
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError) assert "exceeded maximum buffer size" in str(exc_info.value)
assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0])
anyio.run(_test) anyio.run(_test)