Address anyio.BrokenResourceError issue from #139 (#149)
Some checks are pending
Lint / lint (push) Waiting to run
Test / test (3.10) (push) Waiting to run
Test / test (3.11) (push) Waiting to run
Test / test (3.12) (push) Waiting to run
Test / test (3.13) (push) Waiting to run

## Summary

This PR addresses the `anyio.BrokenResourceError` issue from #139 and
aligns the Python SDK's error handling with the TypeScript SDK
implementation.

## Changes

### Fix stream closure issue in Query
- Removed the `async with` context manager from
`Query.receive_messages()` that was closing the stream after first use
- The stream now remains open for the entire session, allowing multiple
queries in streaming mode
- This fixes the `BrokenResourceError` that occurred during multi-turn
conversations

### Align subprocess transport with TypeScript SDK
Following the TypeScript `ProcessTransport` implementation pattern:

- **Added `_exit_error` tracking**: Captures and preserves process-level
errors for better error propagation
- **Enhanced `write()` method checks**: 
  - Validates transport readiness before writing
  - Checks if process is still alive (exit code)
  - Checks for stored exit errors before attempting writes
  - Marks transport as not ready on write failures
- **Improved error handling in `connect()`**: Stores errors as
`_exit_error` for later reference
- **Simplified `is_ready()` method**: Now just returns the `_ready`
flag, matching TypeScript's simpler approach

### Other improvements
- Added asyncio pytest plugin configuration (`-p asyncio` in
pyproject.toml)
- Added clarifying comment about TextReceiveStream line handling

## Testing

The multi-turn conversation example now works correctly:
```bash
python examples/streaming_mode.py multi_turn_conversation
```

## Related Issues

Fixes #139
This commit is contained in:
Dickson Tsai 2025-09-05 13:11:27 +09:00 committed by GitHub
parent 681f46c873
commit 2c8c7fd373
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 49 additions and 26 deletions

View file

@ -61,6 +61,7 @@ testpaths = ["tests"]
pythonpath = ["src"]
addopts = [
"--import-mode=importlib",
"-p", "asyncio",
]
[tool.pytest-asyncio]

View file

@ -456,15 +456,14 @@ class Query:
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive SDK messages (not control messages)."""
async with self._message_receive:
async for message in self._message_receive:
# Check for special messages
if message.get("type") == "end":
break
elif message.get("type") == "error":
raise Exception(message.get("error", "Unknown error"))
async for message in self._message_receive:
# Check for special messages
if message.get("type") == "end":
break
elif message.get("type") == "error":
raise Exception(message.get("error", "Unknown error"))
yield message
yield message
async def close(self) -> None:
"""Close the query and transport."""

View file

@ -46,6 +46,7 @@ class SubprocessCLITransport(Transport):
self._stdin_stream: TextSendStream | None = None
self._stderr_file: Any = None # tempfile.NamedTemporaryFile
self._ready = False
self._exit_error: Exception | None = None # Track process exit errors
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
@ -213,12 +214,18 @@ class SubprocessCLITransport(Transport):
except FileNotFoundError as e:
# Check if the error comes from the working directory or the CLI
if self._cwd and not Path(self._cwd).exists():
raise CLIConnectionError(
error = CLIConnectionError(
f"Working directory does not exist: {self._cwd}"
) from e
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
)
self._exit_error = error
raise error from e
error = CLINotFoundError(f"Claude Code not found at: {self._cli_path}")
self._exit_error = error
raise error from e
except Exception as e:
raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e
error = CLIConnectionError(f"Failed to start Claude Code: {e}")
self._exit_error = error
raise error from e
async def close(self) -> None:
"""Close the transport and clean up resources."""
@ -259,13 +266,34 @@ class SubprocessCLITransport(Transport):
self._stdout_stream = None
self._stderr_stream = None
self._stdin_stream = None
self._exit_error = None
async def write(self, data: str) -> None:
"""Write raw data to the transport."""
if not self._stdin_stream:
raise CLIConnectionError("Cannot write: stdin not available")
# Check if ready (like TypeScript)
if not self._ready or not self._stdin_stream:
raise CLIConnectionError("ProcessTransport is not ready for writing")
await self._stdin_stream.send(data)
# Check if process is still alive (like TypeScript)
if self._process and self._process.returncode is not None:
raise CLIConnectionError(
f"Cannot write to terminated process (exit code: {self._process.returncode})"
)
# Check for exit errors (like TypeScript)
if self._exit_error:
raise CLIConnectionError(
f"Cannot write to process that exited with error: {self._exit_error}"
) from self._exit_error
try:
await self._stdin_stream.send(data)
except Exception as e:
self._ready = False # Mark as not ready (like TypeScript)
self._exit_error = CLIConnectionError(
f"Failed to write to process stdin: {e}"
)
raise self._exit_error from e
async def end_input(self) -> None:
"""End the input stream (close stdin)."""
@ -273,9 +301,6 @@ class SubprocessCLITransport(Transport):
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
if self._process and self._process.stdin:
with suppress(Exception):
await self._process.stdin.aclose()
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Read and parse messages from the transport."""
@ -295,6 +320,9 @@ class SubprocessCLITransport(Transport):
if not line_str:
continue
# Accumulate partial JSON until we can parse it
# Note: TextReceiveStream can truncate long lines, so we need to buffer
# and speculatively parse until we get a complete JSON object
json_lines = line_str.split("\n")
for json_line in json_lines:
@ -361,21 +389,16 @@ class SubprocessCLITransport(Transport):
# Use exit code for error detection, not string matching
if returncode is not None and returncode != 0:
raise ProcessError(
self._exit_error = ProcessError(
f"Command failed with exit code {returncode}",
exit_code=returncode,
stderr=stderr_output,
)
raise self._exit_error
elif stderr_output:
# Log stderr for debugging but don't fail on non-zero exit
logger.debug(f"Process stderr: {stderr_output}")
def is_ready(self) -> bool:
"""Check if transport is ready for communication."""
return (
self._ready
and self._process is not None
and self._process.returncode is None
)
# Remove interrupt and control request methods - these now belong in Query class
return self._ready