fix: wait for first result before closing stdin if SDK MCP present (#380)

Port SDK MCP fix from TypeScript to Python.

Now, when SDK MCP servers or hooks are present, stream_input() waits for
the first result message before closing stdin, allowing bidirectional
control protocol communication to complete.

Fixes repro in
https://github.com/anthropics/claude-agent-sdk-python/issues/266.

The `query()` design requires input streams to be held open by the user
for SDK MCP bidirectional communication to work. This has confused a lot
of folks, so we're moving towards a more explicit lifecycle design. In
the meantime, this is the way we've addressed it with V1 APIs in
https://github.com/anthropics/claude-agent-sdk-typescript.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Dalton Flanagan 2025-12-01 10:19:51 -08:00 committed by GitHub
parent 49482e1dfd
commit a2f24a3101
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -107,6 +107,12 @@ class Query:
self._closed = False
self._initialization_result: dict[str, Any] | None = None
# Track first result for proper stream closure with SDK MCP servers
self._first_result_event = anyio.Event()
self._stream_close_timeout = (
float(os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")) / 1000.0
) # Convert ms to seconds
async def initialize(self) -> dict[str, Any] | None:
"""Initialize control protocol if in streaming mode.
@ -195,6 +201,10 @@ class Query:
# TODO: Implement cancellation support
continue
# Track results for proper stream closure
if msg_type == "result":
self._first_result_event.set()
# Regular SDK messages go to the stream
await self._message_send.send(message)
@ -525,13 +535,35 @@ class Query:
)
async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None:
"""Stream input messages to transport."""
"""Stream input messages to transport.
If SDK MCP servers or hooks are present, waits for the first result
before closing stdin to allow bidirectional control protocol communication.
"""
try:
async for message in stream:
if self._closed:
break
await self.transport.write(json.dumps(message) + "\n")
# After all messages sent, end input
# If we have SDK MCP servers or hooks that need bidirectional communication,
# wait for first result before closing the channel
has_hooks = bool(self.hooks)
if self.sdk_mcp_servers or has_hooks:
logger.debug(
f"Waiting for first result before closing stdin "
f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})"
)
try:
with anyio.move_on_after(self._stream_close_timeout):
await self._first_result_event.wait()
logger.debug("Received first result, closing input stream")
except Exception:
logger.debug(
"Timed out waiting for first result, closing input stream"
)
# After all messages sent (and result received if needed), end input
await self.transport.end_input()
except Exception as e:
logger.debug(f"Error streaming input: {e}")