From a2f24a310191ff50f53cc0a2a7371c9637303b58 Mon Sep 17 00:00:00 2001 From: Dalton Flanagan <6599399+dltn@users.noreply.github.com> Date: Mon, 1 Dec 2025 10:19:51 -0800 Subject: [PATCH] fix: wait for first result before closing stdin if SDK MCP present (#380) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port SDK MCP fix from TypeScript to Python. Now, when SDK MCP servers or hooks are present, stream_input() waits for the first result message before closing stdin, allowing bidirectional control protocol communication to complete. Fixes repro in https://github.com/anthropics/claude-agent-sdk-python/issues/266. The `query()` design requires input streams to be held open by the user for SDK MCP bidirectional communication to work. This has confused a lot of folks, so we're moving towards a more explicit lifecycle design. In the meantime, this is the way we've addressed it with V1 APIs in https://github.com/anthropics/claude-agent-sdk-typescript. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude --- src/claude_agent_sdk/_internal/query.py | 36 +++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index 566e316..e48995f 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -107,6 +107,12 @@ class Query: self._closed = False self._initialization_result: dict[str, Any] | None = None + # Track first result for proper stream closure with SDK MCP servers + self._first_result_event = anyio.Event() + self._stream_close_timeout = ( + float(os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")) / 1000.0 + ) # Convert ms to seconds + async def initialize(self) -> dict[str, Any] | None: """Initialize control protocol if in streaming mode. @@ -195,6 +201,10 @@ class Query: # TODO: Implement cancellation support continue + # Track results for proper stream closure + if msg_type == "result": + self._first_result_event.set() + # Regular SDK messages go to the stream await self._message_send.send(message) @@ -525,13 +535,35 @@ class Query: ) async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None: - """Stream input messages to transport.""" + """Stream input messages to transport. + + If SDK MCP servers or hooks are present, waits for the first result + before closing stdin to allow bidirectional control protocol communication. + """ try: async for message in stream: if self._closed: break await self.transport.write(json.dumps(message) + "\n") - # After all messages sent, end input + + # If we have SDK MCP servers or hooks that need bidirectional communication, + # wait for first result before closing the channel + has_hooks = bool(self.hooks) + if self.sdk_mcp_servers or has_hooks: + logger.debug( + f"Waiting for first result before closing stdin " + f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})" + ) + try: + with anyio.move_on_after(self._stream_close_timeout): + await self._first_result_event.wait() + logger.debug("Received first result, closing input stream") + except Exception: + logger.debug( + "Timed out waiting for first result, closing input stream" + ) + + # After all messages sent (and result received if needed), end input await self.transport.end_input() except Exception as e: logger.debug(f"Error streaming input: {e}")