diff --git a/pyproject.toml b/pyproject.toml index d2835fa..d6f4259 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ testpaths = ["tests"] pythonpath = ["src"] addopts = [ "--import-mode=importlib", + "-p", "asyncio", ] [tool.pytest-asyncio] diff --git a/src/claude_code_sdk/_internal/query.py b/src/claude_code_sdk/_internal/query.py index 0bbc145..ea8045a 100644 --- a/src/claude_code_sdk/_internal/query.py +++ b/src/claude_code_sdk/_internal/query.py @@ -456,15 +456,14 @@ class Query: async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: """Receive SDK messages (not control messages).""" - async with self._message_receive: - async for message in self._message_receive: - # Check for special messages - if message.get("type") == "end": - break - elif message.get("type") == "error": - raise Exception(message.get("error", "Unknown error")) + async for message in self._message_receive: + # Check for special messages + if message.get("type") == "end": + break + elif message.get("type") == "error": + raise Exception(message.get("error", "Unknown error")) - yield message + yield message async def close(self) -> None: """Close the query and transport.""" diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index dd37693..6a681f7 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -46,6 +46,7 @@ class SubprocessCLITransport(Transport): self._stdin_stream: TextSendStream | None = None self._stderr_file: Any = None # tempfile.NamedTemporaryFile self._ready = False + self._exit_error: Exception | None = None # Track process exit errors def _find_cli(self) -> str: """Find Claude Code CLI binary.""" @@ -213,12 +214,18 @@ class SubprocessCLITransport(Transport): except FileNotFoundError as e: # Check if the error comes from the working directory or the CLI if self._cwd and not Path(self._cwd).exists(): - raise CLIConnectionError( + error = CLIConnectionError( f"Working directory does not exist: {self._cwd}" - ) from e - raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e + ) + self._exit_error = error + raise error from e + error = CLINotFoundError(f"Claude Code not found at: {self._cli_path}") + self._exit_error = error + raise error from e except Exception as e: - raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e + error = CLIConnectionError(f"Failed to start Claude Code: {e}") + self._exit_error = error + raise error from e async def close(self) -> None: """Close the transport and clean up resources.""" @@ -259,13 +266,34 @@ class SubprocessCLITransport(Transport): self._stdout_stream = None self._stderr_stream = None self._stdin_stream = None + self._exit_error = None async def write(self, data: str) -> None: """Write raw data to the transport.""" - if not self._stdin_stream: - raise CLIConnectionError("Cannot write: stdin not available") + # Check if ready (like TypeScript) + if not self._ready or not self._stdin_stream: + raise CLIConnectionError("ProcessTransport is not ready for writing") - await self._stdin_stream.send(data) + # Check if process is still alive (like TypeScript) + if self._process and self._process.returncode is not None: + raise CLIConnectionError( + f"Cannot write to terminated process (exit code: {self._process.returncode})" + ) + + # Check for exit errors (like TypeScript) + if self._exit_error: + raise CLIConnectionError( + f"Cannot write to process that exited with error: {self._exit_error}" + ) from self._exit_error + + try: + await self._stdin_stream.send(data) + except Exception as e: + self._ready = False # Mark as not ready (like TypeScript) + self._exit_error = CLIConnectionError( + f"Failed to write to process stdin: {e}" + ) + raise self._exit_error from e async def end_input(self) -> None: """End the input stream (close stdin).""" @@ -273,9 +301,6 @@ class SubprocessCLITransport(Transport): with suppress(Exception): await self._stdin_stream.aclose() self._stdin_stream = None - if self._process and self._process.stdin: - with suppress(Exception): - await self._process.stdin.aclose() def read_messages(self) -> AsyncIterator[dict[str, Any]]: """Read and parse messages from the transport.""" @@ -295,6 +320,9 @@ class SubprocessCLITransport(Transport): if not line_str: continue + # Accumulate partial JSON until we can parse it + # Note: TextReceiveStream can truncate long lines, so we need to buffer + # and speculatively parse until we get a complete JSON object json_lines = line_str.split("\n") for json_line in json_lines: @@ -361,21 +389,16 @@ class SubprocessCLITransport(Transport): # Use exit code for error detection, not string matching if returncode is not None and returncode != 0: - raise ProcessError( + self._exit_error = ProcessError( f"Command failed with exit code {returncode}", exit_code=returncode, stderr=stderr_output, ) + raise self._exit_error elif stderr_output: # Log stderr for debugging but don't fail on non-zero exit logger.debug(f"Process stderr: {stderr_output}") def is_ready(self) -> bool: """Check if transport is ready for communication.""" - return ( - self._ready - and self._process is not None - and self._process.returncode is None - ) - - # Remove interrupt and control request methods - these now belong in Query class + return self._ready