mirror of
https://github.com/anthropics/claude-code-sdk-python.git
synced 2025-12-23 09:19:52 +00:00
## Problem ClaudeSDKClient initialization would hang indefinitely on Windows, timing out after 60 seconds. The SDK successfully spawned the Claude CLI subprocess but control requests sent via stdin never reached the subprocess due to Windows subprocess stdin buffering behavior with Python's asyncio. ## Root Cause On Windows, when using asyncio subprocess streams, data written to stdin can remain buffered and not immediately sent to the child process. The CLI subprocess waits for the initialization request that's stuck in Python's buffer, causing the 60-second timeout. ## Solution 1. Added `flush_stdin()` method to Transport base class (non-abstract, default no-op) 2. Implemented Windows-specific flush in SubprocessCLITransport that calls `drain()` on the asyncio StreamWriter when available 3. Call `flush_stdin()` after all control protocol writes in Query class: - After sending control requests (_send_control_request) - After responding to incoming requests (_handle_control_request) ## Tests Added - test_flush_stdin_on_windows: Verifies drain() called on Windows - test_flush_stdin_on_non_windows: Verifies no-op on other platforms - test_flush_stdin_without_process: Tests graceful handling of missing process - test_flush_stdin_fallback_to_inner_stream: Tests wrapped stream fallback - test_flush_stdin_called_after_control_requests: Integration test for outgoing requests - test_flush_stdin_called_after_control_responses: Integration test for incoming requests All tests pass on macOS, and the fix is platform-specific to Windows only. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
595 lines
21 KiB
Python
595 lines
21 KiB
Python
"""Tests for Claude SDK transport layer."""
|
|
|
|
import os
|
|
import uuid
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import anyio
|
|
import pytest
|
|
|
|
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
|
|
from claude_agent_sdk.types import ClaudeAgentOptions
|
|
|
|
|
|
class TestSubprocessCLITransport:
|
|
"""Test subprocess transport implementation."""
|
|
|
|
def test_find_cli_not_found(self):
|
|
"""Test CLI not found error."""
|
|
from claude_agent_sdk._errors import CLINotFoundError
|
|
|
|
with (
|
|
patch("shutil.which", return_value=None),
|
|
patch("pathlib.Path.exists", return_value=False),
|
|
pytest.raises(CLINotFoundError) as exc_info,
|
|
):
|
|
SubprocessCLITransport(prompt="test", options=ClaudeAgentOptions())
|
|
|
|
assert "Claude Code not found" in str(exc_info.value)
|
|
|
|
def test_build_command_basic(self):
|
|
"""Test building basic CLI command."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="Hello", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert cmd[0] == "/usr/bin/claude"
|
|
assert "--output-format" in cmd
|
|
assert "stream-json" in cmd
|
|
assert "--print" in cmd
|
|
assert "Hello" in cmd
|
|
|
|
def test_cli_path_accepts_pathlib_path(self):
|
|
"""Test that cli_path accepts pathlib.Path objects."""
|
|
from pathlib import Path
|
|
|
|
path = Path("/usr/bin/claude")
|
|
transport = SubprocessCLITransport(
|
|
prompt="Hello",
|
|
options=ClaudeAgentOptions(),
|
|
cli_path=path,
|
|
)
|
|
|
|
# Path object is converted to string, compare with str(path)
|
|
assert transport._cli_path == str(path)
|
|
|
|
def test_build_command_with_system_prompt_string(self):
|
|
"""Test building CLI command with system prompt as string."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(
|
|
system_prompt="Be helpful",
|
|
),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--system-prompt" in cmd
|
|
assert "Be helpful" in cmd
|
|
|
|
def test_build_command_with_system_prompt_preset(self):
|
|
"""Test building CLI command with system prompt preset."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(
|
|
system_prompt={"type": "preset", "preset": "claude_code"},
|
|
),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--system-prompt" not in cmd
|
|
assert "--append-system-prompt" not in cmd
|
|
|
|
def test_build_command_with_system_prompt_preset_and_append(self):
|
|
"""Test building CLI command with system prompt preset and append."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(
|
|
system_prompt={
|
|
"type": "preset",
|
|
"preset": "claude_code",
|
|
"append": "Be concise.",
|
|
},
|
|
),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--system-prompt" not in cmd
|
|
assert "--append-system-prompt" in cmd
|
|
assert "Be concise." in cmd
|
|
|
|
def test_build_command_with_options(self):
|
|
"""Test building CLI command with options."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(
|
|
allowed_tools=["Read", "Write"],
|
|
disallowed_tools=["Bash"],
|
|
model="claude-sonnet-4-5",
|
|
permission_mode="acceptEdits",
|
|
max_turns=5,
|
|
),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--allowedTools" in cmd
|
|
assert "Read,Write" in cmd
|
|
assert "--disallowedTools" in cmd
|
|
assert "Bash" in cmd
|
|
assert "--model" in cmd
|
|
assert "claude-sonnet-4-5" in cmd
|
|
assert "--permission-mode" in cmd
|
|
assert "acceptEdits" in cmd
|
|
assert "--max-turns" in cmd
|
|
assert "5" in cmd
|
|
|
|
def test_build_command_with_add_dirs(self):
|
|
"""Test building CLI command with add_dirs option."""
|
|
from pathlib import Path
|
|
|
|
dir1 = "/path/to/dir1"
|
|
dir2 = Path("/path/to/dir2")
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(add_dirs=[dir1, dir2]),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
|
|
# Check that both directories are in the command
|
|
assert "--add-dir" in cmd
|
|
add_dir_indices = [i for i, x in enumerate(cmd) if x == "--add-dir"]
|
|
assert len(add_dir_indices) == 2
|
|
|
|
# The directories should appear after --add-dir flags
|
|
dirs_in_cmd = [cmd[i + 1] for i in add_dir_indices]
|
|
assert dir1 in dirs_in_cmd
|
|
assert str(dir2) in dirs_in_cmd
|
|
|
|
def test_session_continuation(self):
|
|
"""Test session continuation options."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="Continue from before",
|
|
options=ClaudeAgentOptions(
|
|
continue_conversation=True, resume="session-123"
|
|
),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--continue" in cmd
|
|
assert "--resume" in cmd
|
|
assert "session-123" in cmd
|
|
|
|
def test_connect_close(self):
|
|
"""Test connect and close lifecycle."""
|
|
|
|
async def _test():
|
|
with patch("anyio.open_process") as mock_exec:
|
|
# Mock version check process
|
|
mock_version_process = MagicMock()
|
|
mock_version_process.stdout = MagicMock()
|
|
mock_version_process.stdout.receive = AsyncMock(
|
|
return_value=b"2.0.0 (Claude Code)"
|
|
)
|
|
mock_version_process.terminate = MagicMock()
|
|
mock_version_process.wait = AsyncMock()
|
|
|
|
# Mock main process
|
|
mock_process = MagicMock()
|
|
mock_process.returncode = None
|
|
mock_process.terminate = MagicMock()
|
|
mock_process.wait = AsyncMock()
|
|
mock_process.stdout = MagicMock()
|
|
mock_process.stderr = MagicMock()
|
|
|
|
# Mock stdin with aclose method
|
|
mock_stdin = MagicMock()
|
|
mock_stdin.aclose = AsyncMock()
|
|
mock_process.stdin = mock_stdin
|
|
|
|
# Return version process first, then main process
|
|
mock_exec.side_effect = [mock_version_process, mock_process]
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
await transport.connect()
|
|
assert transport._process is not None
|
|
assert transport.is_ready()
|
|
|
|
await transport.close()
|
|
mock_process.terminate.assert_called_once()
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_read_messages(self):
|
|
"""Test reading messages from CLI output."""
|
|
# This test is simplified to just test the transport creation
|
|
# The full async stream handling is tested in integration tests
|
|
transport = SubprocessCLITransport(
|
|
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
|
|
)
|
|
|
|
# The transport now just provides raw message reading via read_messages()
|
|
# So we just verify the transport can be created and basic structure is correct
|
|
assert transport._prompt == "test"
|
|
assert transport._cli_path == "/usr/bin/claude"
|
|
|
|
def test_connect_with_nonexistent_cwd(self):
|
|
"""Test that connect raises CLIConnectionError when cwd doesn't exist."""
|
|
from claude_agent_sdk._errors import CLIConnectionError
|
|
|
|
async def _test():
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(cwd="/this/directory/does/not/exist"),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
with pytest.raises(CLIConnectionError) as exc_info:
|
|
await transport.connect()
|
|
|
|
assert "/this/directory/does/not/exist" in str(exc_info.value)
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_build_command_with_settings_file(self):
|
|
"""Test building CLI command with settings as file path."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(settings="/path/to/settings.json"),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--settings" in cmd
|
|
assert "/path/to/settings.json" in cmd
|
|
|
|
def test_build_command_with_settings_json(self):
|
|
"""Test building CLI command with settings as JSON object."""
|
|
settings_json = '{"permissions": {"allow": ["Bash(ls:*)"]}}'
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(settings=settings_json),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--settings" in cmd
|
|
assert settings_json in cmd
|
|
|
|
def test_build_command_with_extra_args(self):
|
|
"""Test building CLI command with extra_args for future flags."""
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(
|
|
extra_args={
|
|
"new-flag": "value",
|
|
"boolean-flag": None,
|
|
"another-option": "test-value",
|
|
}
|
|
),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
cmd_str = " ".join(cmd)
|
|
|
|
# Check flags with values
|
|
assert "--new-flag value" in cmd_str
|
|
assert "--another-option test-value" in cmd_str
|
|
|
|
# Check boolean flag (no value)
|
|
assert "--boolean-flag" in cmd
|
|
# Make sure boolean flag doesn't have a value after it
|
|
boolean_idx = cmd.index("--boolean-flag")
|
|
# Either it's the last element or the next element is another flag
|
|
assert boolean_idx == len(cmd) - 1 or cmd[boolean_idx + 1].startswith("--")
|
|
|
|
def test_build_command_with_mcp_servers(self):
|
|
"""Test building CLI command with mcp_servers option."""
|
|
import json
|
|
|
|
mcp_servers = {
|
|
"test-server": {
|
|
"type": "stdio",
|
|
"command": "/path/to/server",
|
|
"args": ["--option", "value"],
|
|
}
|
|
}
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(mcp_servers=mcp_servers),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
|
|
# Find the --mcp-config flag and its value
|
|
assert "--mcp-config" in cmd
|
|
mcp_idx = cmd.index("--mcp-config")
|
|
mcp_config_value = cmd[mcp_idx + 1]
|
|
|
|
# Parse the JSON and verify structure
|
|
config = json.loads(mcp_config_value)
|
|
assert "mcpServers" in config
|
|
assert config["mcpServers"] == mcp_servers
|
|
|
|
def test_build_command_with_mcp_servers_as_file_path(self):
|
|
"""Test building CLI command with mcp_servers as file path."""
|
|
from pathlib import Path
|
|
|
|
# Test with string path
|
|
string_path = "/path/to/mcp-config.json"
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(mcp_servers=string_path),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--mcp-config" in cmd
|
|
mcp_idx = cmd.index("--mcp-config")
|
|
assert cmd[mcp_idx + 1] == string_path
|
|
|
|
# Test with Path object
|
|
path_obj = Path("/path/to/mcp-config.json")
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(mcp_servers=path_obj),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--mcp-config" in cmd
|
|
mcp_idx = cmd.index("--mcp-config")
|
|
# Path object gets converted to string, compare with str(path_obj)
|
|
assert cmd[mcp_idx + 1] == str(path_obj)
|
|
|
|
def test_build_command_with_mcp_servers_as_json_string(self):
|
|
"""Test building CLI command with mcp_servers as JSON string."""
|
|
json_config = '{"mcpServers": {"server": {"type": "stdio", "command": "test"}}}'
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(mcp_servers=json_config),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
cmd = transport._build_command()
|
|
assert "--mcp-config" in cmd
|
|
mcp_idx = cmd.index("--mcp-config")
|
|
assert cmd[mcp_idx + 1] == json_config
|
|
|
|
def test_env_vars_passed_to_subprocess(self):
|
|
"""Test that custom environment variables are passed to the subprocess."""
|
|
|
|
async def _test():
|
|
test_value = f"test-{uuid.uuid4().hex[:8]}"
|
|
custom_env = {
|
|
"MY_TEST_VAR": test_value,
|
|
}
|
|
|
|
options = ClaudeAgentOptions(env=custom_env)
|
|
|
|
# Mock the subprocess to capture the env argument
|
|
with patch(
|
|
"anyio.open_process", new_callable=AsyncMock
|
|
) as mock_open_process:
|
|
# Mock version check process
|
|
mock_version_process = MagicMock()
|
|
mock_version_process.stdout = MagicMock()
|
|
mock_version_process.stdout.receive = AsyncMock(
|
|
return_value=b"2.0.0 (Claude Code)"
|
|
)
|
|
mock_version_process.terminate = MagicMock()
|
|
mock_version_process.wait = AsyncMock()
|
|
|
|
# Mock main process
|
|
mock_process = MagicMock()
|
|
mock_process.stdout = MagicMock()
|
|
mock_stdin = MagicMock()
|
|
mock_stdin.aclose = AsyncMock() # Add async aclose method
|
|
mock_process.stdin = mock_stdin
|
|
mock_process.returncode = None
|
|
|
|
# Return version process first, then main process
|
|
mock_open_process.side_effect = [mock_version_process, mock_process]
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=options,
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
await transport.connect()
|
|
|
|
# Verify open_process was called twice (version check + main process)
|
|
assert mock_open_process.call_count == 2
|
|
|
|
# Check the second call (main process) for env vars
|
|
second_call_kwargs = mock_open_process.call_args_list[1].kwargs
|
|
assert "env" in second_call_kwargs
|
|
env_passed = second_call_kwargs["env"]
|
|
|
|
# Check that custom env var was passed
|
|
assert env_passed["MY_TEST_VAR"] == test_value
|
|
|
|
# Verify SDK identifier is present
|
|
assert "CLAUDE_CODE_ENTRYPOINT" in env_passed
|
|
assert env_passed["CLAUDE_CODE_ENTRYPOINT"] == "sdk-py"
|
|
|
|
# Verify system env vars are also included with correct values
|
|
if "PATH" in os.environ:
|
|
assert "PATH" in env_passed
|
|
assert env_passed["PATH"] == os.environ["PATH"]
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_connect_as_different_user(self):
|
|
"""Test connect as different user."""
|
|
|
|
async def _test():
|
|
custom_user = "claude"
|
|
options = ClaudeAgentOptions(user=custom_user)
|
|
|
|
# Mock the subprocess to capture the env argument
|
|
with patch(
|
|
"anyio.open_process", new_callable=AsyncMock
|
|
) as mock_open_process:
|
|
# Mock version check process
|
|
mock_version_process = MagicMock()
|
|
mock_version_process.stdout = MagicMock()
|
|
mock_version_process.stdout.receive = AsyncMock(
|
|
return_value=b"2.0.0 (Claude Code)"
|
|
)
|
|
mock_version_process.terminate = MagicMock()
|
|
mock_version_process.wait = AsyncMock()
|
|
|
|
# Mock main process
|
|
mock_process = MagicMock()
|
|
mock_process.stdout = MagicMock()
|
|
mock_stdin = MagicMock()
|
|
mock_stdin.aclose = AsyncMock() # Add async aclose method
|
|
mock_process.stdin = mock_stdin
|
|
mock_process.returncode = None
|
|
|
|
# Return version process first, then main process
|
|
mock_open_process.side_effect = [mock_version_process, mock_process]
|
|
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=options,
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
await transport.connect()
|
|
|
|
# Verify open_process was called twice (version check + main process)
|
|
assert mock_open_process.call_count == 2
|
|
|
|
# Check the second call (main process) for user
|
|
second_call_kwargs = mock_open_process.call_args_list[1].kwargs
|
|
assert "user" in second_call_kwargs
|
|
user_passed = second_call_kwargs["user"]
|
|
|
|
# Check that user was passed
|
|
assert user_passed == "claude"
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_flush_stdin_on_windows(self):
|
|
"""Test that flush_stdin calls drain() on Windows (issue #208)."""
|
|
|
|
async def _test():
|
|
# Mock platform.system to return Windows
|
|
with patch("platform.system", return_value="Windows"):
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
# Create a mock process with stdin that has drain method
|
|
mock_process = MagicMock()
|
|
mock_stdin = AsyncMock()
|
|
mock_stdin.drain = AsyncMock()
|
|
mock_process.stdin = mock_stdin
|
|
transport._process = mock_process
|
|
|
|
# Call flush_stdin
|
|
await transport.flush_stdin()
|
|
|
|
# Verify drain was called on Windows
|
|
mock_stdin.drain.assert_called_once()
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_flush_stdin_on_non_windows(self):
|
|
"""Test that flush_stdin does nothing on non-Windows platforms."""
|
|
|
|
async def _test():
|
|
# Mock platform.system to return Linux
|
|
with patch("platform.system", return_value="Linux"):
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
# Create a mock process with stdin
|
|
mock_process = MagicMock()
|
|
mock_stdin = AsyncMock()
|
|
mock_stdin.drain = AsyncMock()
|
|
mock_process.stdin = mock_stdin
|
|
transport._process = mock_process
|
|
|
|
# Call flush_stdin
|
|
await transport.flush_stdin()
|
|
|
|
# Verify drain was NOT called on non-Windows
|
|
mock_stdin.drain.assert_not_called()
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_flush_stdin_without_process(self):
|
|
"""Test that flush_stdin handles missing process gracefully."""
|
|
|
|
async def _test():
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
# Don't set up a process
|
|
transport._process = None
|
|
|
|
# Should not raise an error
|
|
await transport.flush_stdin()
|
|
|
|
anyio.run(_test)
|
|
|
|
def test_flush_stdin_fallback_to_inner_stream(self):
|
|
"""Test that flush_stdin tries to find drain() in wrapped streams."""
|
|
|
|
async def _test():
|
|
# Mock platform.system to return Windows
|
|
with patch("platform.system", return_value="Windows"):
|
|
transport = SubprocessCLITransport(
|
|
prompt="test",
|
|
options=ClaudeAgentOptions(),
|
|
cli_path="/usr/bin/claude",
|
|
)
|
|
|
|
# Create a mock process with stdin that doesn't have drain,
|
|
# but has an inner _stream that does
|
|
mock_process = MagicMock()
|
|
mock_stdin = MagicMock()
|
|
# Remove drain from stdin itself
|
|
del mock_stdin.drain
|
|
|
|
# Add inner stream with drain
|
|
mock_inner_stream = AsyncMock()
|
|
mock_inner_stream.drain = AsyncMock()
|
|
mock_stdin._stream = mock_inner_stream
|
|
|
|
mock_process.stdin = mock_stdin
|
|
transport._process = mock_process
|
|
|
|
# Call flush_stdin
|
|
await transport.flush_stdin()
|
|
|
|
# Verify drain was called on the inner stream
|
|
mock_inner_stream.drain.assert_called_once()
|
|
|
|
anyio.run(_test)
|