claude-code-sdk-python/tests/test_transport.py
Ashwin Bhat 9dd6fe5f91
Fix Windows subprocess stdin buffering issue causing ClaudeSDKClient hangs (#208)
## Problem
ClaudeSDKClient initialization would hang indefinitely on Windows, timing out
after 60 seconds. The SDK successfully spawned the Claude CLI subprocess but
control requests sent via stdin never reached the subprocess due to Windows
subprocess stdin buffering behavior with Python's asyncio.

## Root Cause
On Windows, when using asyncio subprocess streams, data written to stdin can
remain buffered and not immediately sent to the child process. The CLI
subprocess waits for the initialization request that's stuck in Python's
buffer, causing the 60-second timeout.

## Solution
1. Added `flush_stdin()` method to Transport base class (non-abstract, default no-op)
2. Implemented Windows-specific flush in SubprocessCLITransport that calls
   `drain()` on the asyncio StreamWriter when available
3. Call `flush_stdin()` after all control protocol writes in Query class:
   - After sending control requests (_send_control_request)
   - After responding to incoming requests (_handle_control_request)

## Tests Added
- test_flush_stdin_on_windows: Verifies drain() called on Windows
- test_flush_stdin_on_non_windows: Verifies no-op on other platforms
- test_flush_stdin_without_process: Tests graceful handling of missing process
- test_flush_stdin_fallback_to_inner_stream: Tests wrapped stream fallback
- test_flush_stdin_called_after_control_requests: Integration test for outgoing requests
- test_flush_stdin_called_after_control_responses: Integration test for incoming requests

All tests pass on macOS, and the fix is platform-specific to Windows only.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-07 18:28:51 -07:00

595 lines
21 KiB
Python

"""Tests for Claude SDK transport layer."""
import os
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import anyio
import pytest
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_agent_sdk.types import ClaudeAgentOptions
class TestSubprocessCLITransport:
"""Test subprocess transport implementation."""
def test_find_cli_not_found(self):
"""Test CLI not found error."""
from claude_agent_sdk._errors import CLINotFoundError
with (
patch("shutil.which", return_value=None),
patch("pathlib.Path.exists", return_value=False),
pytest.raises(CLINotFoundError) as exc_info,
):
SubprocessCLITransport(prompt="test", options=ClaudeAgentOptions())
assert "Claude Code not found" in str(exc_info.value)
def test_build_command_basic(self):
"""Test building basic CLI command."""
transport = SubprocessCLITransport(
prompt="Hello", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
cmd = transport._build_command()
assert cmd[0] == "/usr/bin/claude"
assert "--output-format" in cmd
assert "stream-json" in cmd
assert "--print" in cmd
assert "Hello" in cmd
def test_cli_path_accepts_pathlib_path(self):
"""Test that cli_path accepts pathlib.Path objects."""
from pathlib import Path
path = Path("/usr/bin/claude")
transport = SubprocessCLITransport(
prompt="Hello",
options=ClaudeAgentOptions(),
cli_path=path,
)
# Path object is converted to string, compare with str(path)
assert transport._cli_path == str(path)
def test_build_command_with_system_prompt_string(self):
"""Test building CLI command with system prompt as string."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(
system_prompt="Be helpful",
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--system-prompt" in cmd
assert "Be helpful" in cmd
def test_build_command_with_system_prompt_preset(self):
"""Test building CLI command with system prompt preset."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(
system_prompt={"type": "preset", "preset": "claude_code"},
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--system-prompt" not in cmd
assert "--append-system-prompt" not in cmd
def test_build_command_with_system_prompt_preset_and_append(self):
"""Test building CLI command with system prompt preset and append."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(
system_prompt={
"type": "preset",
"preset": "claude_code",
"append": "Be concise.",
},
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--system-prompt" not in cmd
assert "--append-system-prompt" in cmd
assert "Be concise." in cmd
def test_build_command_with_options(self):
"""Test building CLI command with options."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(
allowed_tools=["Read", "Write"],
disallowed_tools=["Bash"],
model="claude-sonnet-4-5",
permission_mode="acceptEdits",
max_turns=5,
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--allowedTools" in cmd
assert "Read,Write" in cmd
assert "--disallowedTools" in cmd
assert "Bash" in cmd
assert "--model" in cmd
assert "claude-sonnet-4-5" in cmd
assert "--permission-mode" in cmd
assert "acceptEdits" in cmd
assert "--max-turns" in cmd
assert "5" in cmd
def test_build_command_with_add_dirs(self):
"""Test building CLI command with add_dirs option."""
from pathlib import Path
dir1 = "/path/to/dir1"
dir2 = Path("/path/to/dir2")
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(add_dirs=[dir1, dir2]),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
# Check that both directories are in the command
assert "--add-dir" in cmd
add_dir_indices = [i for i, x in enumerate(cmd) if x == "--add-dir"]
assert len(add_dir_indices) == 2
# The directories should appear after --add-dir flags
dirs_in_cmd = [cmd[i + 1] for i in add_dir_indices]
assert dir1 in dirs_in_cmd
assert str(dir2) in dirs_in_cmd
def test_session_continuation(self):
"""Test session continuation options."""
transport = SubprocessCLITransport(
prompt="Continue from before",
options=ClaudeAgentOptions(
continue_conversation=True, resume="session-123"
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--continue" in cmd
assert "--resume" in cmd
assert "session-123" in cmd
def test_connect_close(self):
"""Test connect and close lifecycle."""
async def _test():
with patch("anyio.open_process") as mock_exec:
# Mock version check process
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()
# Mock main process
mock_process = MagicMock()
mock_process.returncode = None
mock_process.terminate = MagicMock()
mock_process.wait = AsyncMock()
mock_process.stdout = MagicMock()
mock_process.stderr = MagicMock()
# Mock stdin with aclose method
mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock()
mock_process.stdin = mock_stdin
# Return version process first, then main process
mock_exec.side_effect = [mock_version_process, mock_process]
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(),
cli_path="/usr/bin/claude",
)
await transport.connect()
assert transport._process is not None
assert transport.is_ready()
await transport.close()
mock_process.terminate.assert_called_once()
anyio.run(_test)
def test_read_messages(self):
"""Test reading messages from CLI output."""
# This test is simplified to just test the transport creation
# The full async stream handling is tested in integration tests
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
# The transport now just provides raw message reading via read_messages()
# So we just verify the transport can be created and basic structure is correct
assert transport._prompt == "test"
assert transport._cli_path == "/usr/bin/claude"
def test_connect_with_nonexistent_cwd(self):
"""Test that connect raises CLIConnectionError when cwd doesn't exist."""
from claude_agent_sdk._errors import CLIConnectionError
async def _test():
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(cwd="/this/directory/does/not/exist"),
cli_path="/usr/bin/claude",
)
with pytest.raises(CLIConnectionError) as exc_info:
await transport.connect()
assert "/this/directory/does/not/exist" in str(exc_info.value)
anyio.run(_test)
def test_build_command_with_settings_file(self):
"""Test building CLI command with settings as file path."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(settings="/path/to/settings.json"),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--settings" in cmd
assert "/path/to/settings.json" in cmd
def test_build_command_with_settings_json(self):
"""Test building CLI command with settings as JSON object."""
settings_json = '{"permissions": {"allow": ["Bash(ls:*)"]}}'
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(settings=settings_json),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--settings" in cmd
assert settings_json in cmd
def test_build_command_with_extra_args(self):
"""Test building CLI command with extra_args for future flags."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(
extra_args={
"new-flag": "value",
"boolean-flag": None,
"another-option": "test-value",
}
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
cmd_str = " ".join(cmd)
# Check flags with values
assert "--new-flag value" in cmd_str
assert "--another-option test-value" in cmd_str
# Check boolean flag (no value)
assert "--boolean-flag" in cmd
# Make sure boolean flag doesn't have a value after it
boolean_idx = cmd.index("--boolean-flag")
# Either it's the last element or the next element is another flag
assert boolean_idx == len(cmd) - 1 or cmd[boolean_idx + 1].startswith("--")
def test_build_command_with_mcp_servers(self):
"""Test building CLI command with mcp_servers option."""
import json
mcp_servers = {
"test-server": {
"type": "stdio",
"command": "/path/to/server",
"args": ["--option", "value"],
}
}
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(mcp_servers=mcp_servers),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
# Find the --mcp-config flag and its value
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
mcp_config_value = cmd[mcp_idx + 1]
# Parse the JSON and verify structure
config = json.loads(mcp_config_value)
assert "mcpServers" in config
assert config["mcpServers"] == mcp_servers
def test_build_command_with_mcp_servers_as_file_path(self):
"""Test building CLI command with mcp_servers as file path."""
from pathlib import Path
# Test with string path
string_path = "/path/to/mcp-config.json"
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(mcp_servers=string_path),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
assert cmd[mcp_idx + 1] == string_path
# Test with Path object
path_obj = Path("/path/to/mcp-config.json")
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(mcp_servers=path_obj),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
# Path object gets converted to string, compare with str(path_obj)
assert cmd[mcp_idx + 1] == str(path_obj)
def test_build_command_with_mcp_servers_as_json_string(self):
"""Test building CLI command with mcp_servers as JSON string."""
json_config = '{"mcpServers": {"server": {"type": "stdio", "command": "test"}}}'
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(mcp_servers=json_config),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
assert cmd[mcp_idx + 1] == json_config
def test_env_vars_passed_to_subprocess(self):
"""Test that custom environment variables are passed to the subprocess."""
async def _test():
test_value = f"test-{uuid.uuid4().hex[:8]}"
custom_env = {
"MY_TEST_VAR": test_value,
}
options = ClaudeAgentOptions(env=custom_env)
# Mock the subprocess to capture the env argument
with patch(
"anyio.open_process", new_callable=AsyncMock
) as mock_open_process:
# Mock version check process
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()
# Mock main process
mock_process = MagicMock()
mock_process.stdout = MagicMock()
mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock() # Add async aclose method
mock_process.stdin = mock_stdin
mock_process.returncode = None
# Return version process first, then main process
mock_open_process.side_effect = [mock_version_process, mock_process]
transport = SubprocessCLITransport(
prompt="test",
options=options,
cli_path="/usr/bin/claude",
)
await transport.connect()
# Verify open_process was called twice (version check + main process)
assert mock_open_process.call_count == 2
# Check the second call (main process) for env vars
second_call_kwargs = mock_open_process.call_args_list[1].kwargs
assert "env" in second_call_kwargs
env_passed = second_call_kwargs["env"]
# Check that custom env var was passed
assert env_passed["MY_TEST_VAR"] == test_value
# Verify SDK identifier is present
assert "CLAUDE_CODE_ENTRYPOINT" in env_passed
assert env_passed["CLAUDE_CODE_ENTRYPOINT"] == "sdk-py"
# Verify system env vars are also included with correct values
if "PATH" in os.environ:
assert "PATH" in env_passed
assert env_passed["PATH"] == os.environ["PATH"]
anyio.run(_test)
def test_connect_as_different_user(self):
"""Test connect as different user."""
async def _test():
custom_user = "claude"
options = ClaudeAgentOptions(user=custom_user)
# Mock the subprocess to capture the env argument
with patch(
"anyio.open_process", new_callable=AsyncMock
) as mock_open_process:
# Mock version check process
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()
# Mock main process
mock_process = MagicMock()
mock_process.stdout = MagicMock()
mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock() # Add async aclose method
mock_process.stdin = mock_stdin
mock_process.returncode = None
# Return version process first, then main process
mock_open_process.side_effect = [mock_version_process, mock_process]
transport = SubprocessCLITransport(
prompt="test",
options=options,
cli_path="/usr/bin/claude",
)
await transport.connect()
# Verify open_process was called twice (version check + main process)
assert mock_open_process.call_count == 2
# Check the second call (main process) for user
second_call_kwargs = mock_open_process.call_args_list[1].kwargs
assert "user" in second_call_kwargs
user_passed = second_call_kwargs["user"]
# Check that user was passed
assert user_passed == "claude"
anyio.run(_test)
def test_flush_stdin_on_windows(self):
"""Test that flush_stdin calls drain() on Windows (issue #208)."""
async def _test():
# Mock platform.system to return Windows
with patch("platform.system", return_value="Windows"):
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(),
cli_path="/usr/bin/claude",
)
# Create a mock process with stdin that has drain method
mock_process = MagicMock()
mock_stdin = AsyncMock()
mock_stdin.drain = AsyncMock()
mock_process.stdin = mock_stdin
transport._process = mock_process
# Call flush_stdin
await transport.flush_stdin()
# Verify drain was called on Windows
mock_stdin.drain.assert_called_once()
anyio.run(_test)
def test_flush_stdin_on_non_windows(self):
"""Test that flush_stdin does nothing on non-Windows platforms."""
async def _test():
# Mock platform.system to return Linux
with patch("platform.system", return_value="Linux"):
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(),
cli_path="/usr/bin/claude",
)
# Create a mock process with stdin
mock_process = MagicMock()
mock_stdin = AsyncMock()
mock_stdin.drain = AsyncMock()
mock_process.stdin = mock_stdin
transport._process = mock_process
# Call flush_stdin
await transport.flush_stdin()
# Verify drain was NOT called on non-Windows
mock_stdin.drain.assert_not_called()
anyio.run(_test)
def test_flush_stdin_without_process(self):
"""Test that flush_stdin handles missing process gracefully."""
async def _test():
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(),
cli_path="/usr/bin/claude",
)
# Don't set up a process
transport._process = None
# Should not raise an error
await transport.flush_stdin()
anyio.run(_test)
def test_flush_stdin_fallback_to_inner_stream(self):
"""Test that flush_stdin tries to find drain() in wrapped streams."""
async def _test():
# Mock platform.system to return Windows
with patch("platform.system", return_value="Windows"):
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(),
cli_path="/usr/bin/claude",
)
# Create a mock process with stdin that doesn't have drain,
# but has an inner _stream that does
mock_process = MagicMock()
mock_stdin = MagicMock()
# Remove drain from stdin itself
del mock_stdin.drain
# Add inner stream with drain
mock_inner_stream = AsyncMock()
mock_inner_stream.drain = AsyncMock()
mock_stdin._stream = mock_inner_stream
mock_process.stdin = mock_stdin
transport._process = mock_process
# Call flush_stdin
await transport.flush_stdin()
# Verify drain was called on the inner stream
mock_inner_stream.drain.assert_called_once()
anyio.run(_test)