Compare commits

..

No commits in common. "main" and "v0.1.8" have entirely different histories.
main ... v0.1.8

25 changed files with 109 additions and 1371 deletions

View file

@ -1,9 +0,0 @@
---
name: test-agent
description: A simple test agent for SDK testing
tools: Read
---
# Test Agent
You are a simple test agent. When asked a question, provide a brief, helpful answer.

View file

@ -1,49 +0,0 @@
# Git
.git
.gitignore
# Python
__pycache__
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.env
.venv
env/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Testing/Coverage
.coverage
.pytest_cache/
htmlcov/
.tox/
.nox/
# Misc
*.log
.DS_Store

View file

@ -24,6 +24,12 @@ jobs:
VERSION="${BRANCH_NAME#release/v}"
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Get previous release tag
id: previous_tag
run: |
PREVIOUS_TAG=$(git describe --tags --abbrev=0 HEAD~1 2>/dev/null || echo "")
echo "previous_tag=$PREVIOUS_TAG" >> $GITHUB_OUTPUT
- name: Create and push tag
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
@ -40,34 +46,14 @@ jobs:
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
VERSION="${{ steps.extract_version.outputs.version }}"
# Create release with auto-generated notes
gh release create "v${{ steps.extract_version.outputs.version }}" \
--title "Release v${{ steps.extract_version.outputs.version }}" \
--generate-notes \
--notes-start-tag "${{ steps.previous_tag.outputs.previous_tag }}" \
--notes "Published to PyPI: https://pypi.org/project/claude-agent-sdk/${{ steps.extract_version.outputs.version }}/
# Extract changelog section for this version to a temp file
awk -v ver="$VERSION" '
/^## / {
if (found) exit
if ($2 == ver) found=1
next
}
found { print }
' CHANGELOG.md > release_notes.md
# Append install instructions
{
echo ""
echo "---"
echo ""
echo "**PyPI:** https://pypi.org/project/claude-agent-sdk/VERSION/"
echo ""
echo '```bash'
echo "pip install claude-agent-sdk==VERSION"
echo '```'
} >> release_notes.md
# Replace VERSION placeholder
sed -i "s/VERSION/$VERSION/g" release_notes.md
# Create release with notes from file
gh release create "v$VERSION" \
--title "v$VERSION" \
--notes-file release_notes.md
### Installation
\`\`\`bash
pip install claude-agent-sdk==${{ steps.extract_version.outputs.version }}
\`\`\`"

View file

@ -7,6 +7,11 @@ on:
description: 'Package version to publish (e.g., 0.1.4)'
required: true
type: string
claude_code_version:
description: 'Claude Code CLI version to bundle (e.g., 2.0.0 or latest)'
required: false
type: string
default: 'latest'
jobs:
test:
runs-on: ubuntu-latest
@ -61,13 +66,15 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, ubuntu-24.04-arm, macos-latest, windows-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
permissions:
contents: write
pull-requests: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch all history including tags (necessary for changelog generation)
- name: Set up Python
uses: actions/setup-python@v5
@ -84,6 +91,7 @@ jobs:
run: |
python scripts/build_wheel.py \
--version "${{ github.event.inputs.version }}" \
--cli-version "${{ github.event.inputs.claude_code_version }}" \
--skip-sdist \
--clean
shell: bash
@ -107,7 +115,6 @@ jobs:
- uses: actions/checkout@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
fetch-depth: 0 # Fetch all history including tags for changelog generation
- name: Set up Python
uses: actions/setup-python@v5
@ -125,12 +132,9 @@ jobs:
run: |
python scripts/update_version.py "${{ env.VERSION }}"
- name: Read CLI version from code
id: cli_version
- name: Update CLI version
run: |
CLI_VERSION=$(python -c "import re; print(re.search(r'__cli_version__ = \"([^\"]+)\"', open('src/claude_agent_sdk/_cli_version.py').read()).group(1))")
echo "cli_version=$CLI_VERSION" >> $GITHUB_OUTPUT
echo "Bundled CLI version: $CLI_VERSION"
python scripts/update_cli_version.py "${{ github.event.inputs.claude_code_version }}"
- name: Download all wheel artifacts
uses: actions/download-artifact@v4
@ -177,8 +181,8 @@ jobs:
git checkout -b "$BRANCH_NAME"
# Commit version changes
git add pyproject.toml src/claude_agent_sdk/_version.py
git commit -m "chore: release v${{ env.VERSION }}"
git add pyproject.toml src/claude_agent_sdk/_version.py src/claude_agent_sdk/_cli_version.py
git commit -m "chore: bump version to ${{ env.VERSION }} with CLI ${{ github.event.inputs.claude_code_version }}"
- name: Update changelog with Claude
continue-on-error: true
@ -188,7 +192,6 @@ jobs:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}
claude_args: |
--model claude-opus-4-5
--allowedTools 'Bash(git add:*),Bash(git commit:*),Edit'
- name: Push branch and create PR
@ -204,11 +207,12 @@ jobs:
## Changes
- Updated version in \`pyproject.toml\` to ${{ env.VERSION }}
- Updated version in \`src/claude_agent_sdk/_version.py\` to ${{ env.VERSION }}
- Updated bundled CLI version in \`src/claude_agent_sdk/_cli_version.py\` to ${{ github.event.inputs.claude_code_version }}
- Updated \`CHANGELOG.md\` with release notes
## Release Information
- Published to PyPI: https://pypi.org/project/claude-agent-sdk/${{ env.VERSION }}/
- Bundled CLI version: ${{ steps.cli_version.outputs.cli_version }}
- Bundled CLI version: ${{ github.event.inputs.claude_code_version }}
- Install with: \`pip install claude-agent-sdk==${{ env.VERSION }}\`
🤖 Generated by GitHub Actions"

View file

@ -1,36 +0,0 @@
name: Post new issues to Slack
on:
issues:
types: [opened]
jobs:
notify:
runs-on: ubuntu-latest
steps:
- name: Post to Slack
uses: slackapi/slack-github-action@91efab103c0de0a537f72a35f6b8cda0ee76bf0a # 2.1.1
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
{
"channel": "C09HY5E0K60",
"text": "New issue opened in ${{ github.repository }}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*New Issue:* <${{ github.event.issue.html_url }}|#${{ github.event.issue.number }} ${{ github.event.issue.title }}>"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Author:* ${{ github.event.issue.user.login }}"
}
}
]
}

View file

@ -81,24 +81,6 @@ jobs:
run: |
python -m pytest e2e-tests/ -v -m e2e
test-e2e-docker:
runs-on: ubuntu-latest
needs: test # Run after unit tests pass
# Run e2e tests in Docker to catch container-specific issues like #406
steps:
- uses: actions/checkout@v4
- name: Build Docker test image
run: docker build -f Dockerfile.test -t claude-sdk-test .
- name: Run e2e tests in Docker
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: |
docker run --rm -e ANTHROPIC_API_KEY \
claude-sdk-test python -m pytest e2e-tests/ -v -m e2e
test-examples:
runs-on: ubuntu-latest
needs: test-e2e # Run after e2e tests

View file

@ -1,88 +1,5 @@
# Changelog
## 0.1.18
### Internal/Other Changes
- **Docker-based test infrastructure**: Added Docker support for running e2e tests in containerized environments, helping catch Docker-specific issues (#424)
- Updated bundled Claude CLI to version 2.0.72
## 0.1.17
### New Features
- **UserMessage UUID field**: Added `uuid` field to `UserMessage` response type, making it easier to use the `rewind_files()` method by providing direct access to message identifiers needed for file checkpointing (#418)
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.70
## 0.1.16
### Bug Fixes
- **Rate limit detection**: Fixed parsing of the `error` field in `AssistantMessage`, enabling applications to detect and handle API errors like rate limits. Previously, the `error` field was defined but never populated from CLI responses (#405)
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.68
## 0.1.15
### New Features
- **File checkpointing and rewind**: Added `enable_file_checkpointing` option to `ClaudeAgentOptions` and `rewind_files(user_message_id)` method to `ClaudeSDKClient` and `Query`. This enables reverting file changes made during a session back to a specific checkpoint, useful for exploring different approaches or recovering from unwanted modifications (#395)
### Documentation
- Added license and terms section to README (#399)
## 0.1.14
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.62
## 0.1.13
### Bug Fixes
- **Faster error handling**: CLI errors (e.g., invalid session ID) now propagate to pending requests immediately instead of waiting for the 60-second timeout (#388)
- **Pydantic 2.12+ compatibility**: Fixed `PydanticUserError` caused by `McpServer` type only being imported under `TYPE_CHECKING` (#385)
- **Concurrent subagent writes**: Added write lock to prevent `BusyResourceError` when multiple subagents invoke MCP tools in parallel (#391)
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.59
## 0.1.12
### New Features
- **Tools option**: Added `tools` option to `ClaudeAgentOptions` for controlling the base set of available tools, matching the TypeScript SDK functionality. Supports three modes:
- Array of tool names to specify which tools should be available (e.g., `["Read", "Edit", "Bash"]`)
- Empty array `[]` to disable all built-in tools
- Preset object `{"type": "preset", "preset": "claude_code"}` to use the default Claude Code toolset
- **SDK beta support**: Added `betas` option to `ClaudeAgentOptions` for enabling Anthropic API beta features. Currently supports `"context-1m-2025-08-07"` for extended context window
## 0.1.11
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.57
## 0.1.10
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.53
## 0.1.9
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.49
## 0.1.8
### Features

View file

@ -1,29 +0,0 @@
# Dockerfile for running SDK tests in a containerized environment
# This helps catch Docker-specific issues like #406
FROM python:3.12-slim
# Install dependencies for Claude CLI and git (needed for some tests)
RUN apt-get update && apt-get install -y \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# Install Claude Code CLI
RUN curl -fsSL https://claude.ai/install.sh | bash
ENV PATH="/root/.local/bin:$PATH"
# Set up working directory
WORKDIR /app
# Copy the SDK source
COPY . .
# Install SDK with dev dependencies
RUN pip install -e ".[dev]"
# Verify CLI installation
RUN claude -v
# Default: run unit tests
CMD ["python", "-m", "pytest", "tests/", "-v"]

View file

@ -351,6 +351,6 @@ The package is published to PyPI via the GitHub Actions workflow in `.github/wor
The workflow tracks both the package version and the bundled CLI version separately, allowing you to release a new package version with an updated CLI without code changes.
## License and terms
## License
Use of this SDK is governed by Anthropic's [Commercial Terms of Service](https://www.anthropic.com/legal/commercial-terms), including when you use it to power products and services that you make available to your own customers and end users, except to the extent a specific component or dependency is covered by a different license as indicated in that component's LICENSE file.
MIT

View file

@ -38,88 +38,15 @@ async def test_agent_definition():
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
agents = message.data.get("agents", [])
assert isinstance(agents, list), (
f"agents should be a list of strings, got: {type(agents)}"
)
assert "test-agent" in agents, (
f"test-agent should be available, got: {agents}"
)
assert isinstance(
agents, list
), f"agents should be a list of strings, got: {type(agents)}"
assert (
"test-agent" in agents
), f"test-agent should be available, got: {agents}"
break
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_filesystem_agent_loading():
"""Test that filesystem-based agents load via setting_sources and produce full response.
This is the core test for issue #406. It verifies that when using
setting_sources=["project"] with a .claude/agents/ directory containing
agent definitions, the SDK:
1. Loads the agents (they appear in init message)
2. Produces a full response with AssistantMessage
3. Completes with a ResultMessage
The bug in #406 causes the iterator to complete after only the
init SystemMessage, never yielding AssistantMessage or ResultMessage.
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Create a temporary project with a filesystem agent
project_dir = Path(tmpdir)
agents_dir = project_dir / ".claude" / "agents"
agents_dir.mkdir(parents=True)
# Create a test agent file
agent_file = agents_dir / "fs-test-agent.md"
agent_file.write_text(
"""---
name: fs-test-agent
description: A filesystem test agent for SDK testing
tools: Read
---
# Filesystem Test Agent
You are a simple test agent. When asked a question, provide a brief, helpful answer.
"""
)
options = ClaudeAgentOptions(
setting_sources=["project"],
cwd=project_dir,
max_turns=1,
)
messages = []
async with ClaudeSDKClient(options=options) as client:
await client.query("Say hello in exactly 3 words")
async for msg in client.receive_response():
messages.append(msg)
# Must have at least init, assistant, result
message_types = [type(m).__name__ for m in messages]
assert "SystemMessage" in message_types, "Missing SystemMessage (init)"
assert "AssistantMessage" in message_types, (
f"Missing AssistantMessage - got only: {message_types}. "
"This may indicate issue #406 (silent failure with filesystem agents)."
)
assert "ResultMessage" in message_types, "Missing ResultMessage"
# Find the init message and check for the filesystem agent
for msg in messages:
if isinstance(msg, SystemMessage) and msg.subtype == "init":
agents = msg.data.get("agents", [])
# Agents are returned as strings (just names)
assert "fs-test-agent" in agents, (
f"fs-test-agent not loaded from filesystem. Found: {agents}"
)
break
# On Windows, wait for file handles to be released before cleanup
if sys.platform == "win32":
await asyncio.sleep(0.5)
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_setting_sources_default():
@ -147,12 +74,12 @@ async def test_setting_sources_default():
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
output_style = message.data.get("output_style")
assert output_style != "local-test-style", (
f"outputStyle should NOT be from local settings (default is no settings), got: {output_style}"
)
assert output_style == "default", (
f"outputStyle should be 'default', got: {output_style}"
)
assert (
output_style != "local-test-style"
), f"outputStyle should NOT be from local settings (default is no settings), got: {output_style}"
assert (
output_style == "default"
), f"outputStyle should be 'default', got: {output_style}"
break
# On Windows, wait for file handles to be released before cleanup
@ -194,9 +121,9 @@ This is a test command.
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
commands = message.data.get("slash_commands", [])
assert "testcmd" not in commands, (
f"testcmd should NOT be available with user-only sources, got: {commands}"
)
assert (
"testcmd" not in commands
), f"testcmd should NOT be available with user-only sources, got: {commands}"
break
# On Windows, wait for file handles to be released before cleanup
@ -232,11 +159,11 @@ async def test_setting_sources_project_included():
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
output_style = message.data.get("output_style")
assert output_style == "local-test-style", (
f"outputStyle should be from local settings, got: {output_style}"
)
assert (
output_style == "local-test-style"
), f"outputStyle should be from local settings, got: {output_style}"
break
# On Windows, wait for file handles to be released before cleanup
if sys.platform == "win32":
await asyncio.sleep(0.5)
await asyncio.sleep(0.5)

View file

@ -1,107 +0,0 @@
#!/usr/bin/env python3
"""Example of loading filesystem-based agents via setting_sources.
This example demonstrates how to load agents defined in .claude/agents/ files
using the setting_sources option. This is different from inline AgentDefinition
objects - these agents are loaded from markdown files on disk.
This example tests the scenario from issue #406 where filesystem-based agents
loaded via setting_sources=["project"] may silently fail in certain environments.
Usage:
./examples/filesystem_agents.py
"""
import asyncio
from pathlib import Path
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
SystemMessage,
TextBlock,
)
def extract_agents(msg: SystemMessage) -> list[str]:
"""Extract agent names from system message init data."""
if msg.subtype == "init":
agents = msg.data.get("agents", [])
# Agents can be either strings or dicts with a 'name' field
result = []
for a in agents:
if isinstance(a, str):
result.append(a)
elif isinstance(a, dict):
result.append(a.get("name", ""))
return result
return []
async def main():
"""Test loading filesystem-based agents."""
print("=== Filesystem Agents Example ===")
print("Testing: setting_sources=['project'] with .claude/agents/test-agent.md")
print()
# Use the SDK repo directory which has .claude/agents/test-agent.md
sdk_dir = Path(__file__).parent.parent
options = ClaudeAgentOptions(
setting_sources=["project"],
cwd=sdk_dir,
)
message_types: list[str] = []
agents_found: list[str] = []
async with ClaudeSDKClient(options=options) as client:
await client.query("Say hello in exactly 3 words")
async for msg in client.receive_response():
message_types.append(type(msg).__name__)
if isinstance(msg, SystemMessage) and msg.subtype == "init":
agents_found = extract_agents(msg)
print(f"Init message received. Agents loaded: {agents_found}")
elif isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Assistant: {block.text}")
elif isinstance(msg, ResultMessage):
print(
f"Result: subtype={msg.subtype}, cost=${msg.total_cost_usd or 0:.4f}"
)
print()
print("=== Summary ===")
print(f"Message types received: {message_types}")
print(f"Total messages: {len(message_types)}")
# Validate the results
has_init = "SystemMessage" in message_types
has_assistant = "AssistantMessage" in message_types
has_result = "ResultMessage" in message_types
has_test_agent = "test-agent" in agents_found
print()
if has_init and has_assistant and has_result:
print("SUCCESS: Received full response (init, assistant, result)")
else:
print("FAILURE: Did not receive full response")
print(f" - Init: {has_init}")
print(f" - Assistant: {has_assistant}")
print(f" - Result: {has_result}")
if has_test_agent:
print("SUCCESS: test-agent was loaded from filesystem")
else:
print("WARNING: test-agent was NOT loaded (may not exist in .claude/agents/)")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,111 +0,0 @@
#!/usr/bin/env python3
"""Example demonstrating the tools option and verifying tools in system message."""
import anyio
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
ResultMessage,
SystemMessage,
TextBlock,
query,
)
async def tools_array_example():
"""Example with tools as array of specific tool names."""
print("=== Tools Array Example ===")
print("Setting tools=['Read', 'Glob', 'Grep']")
print()
options = ClaudeAgentOptions(
tools=["Read", "Glob", "Grep"],
max_turns=1,
)
async for message in query(
prompt="What tools do you have available? Just list them briefly.",
options=options,
):
if isinstance(message, SystemMessage) and message.subtype == "init":
tools = message.data.get("tools", [])
print(f"Tools from system message: {tools}")
print()
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def tools_empty_array_example():
"""Example with tools as empty array (disables all built-in tools)."""
print("=== Tools Empty Array Example ===")
print("Setting tools=[] (disables all built-in tools)")
print()
options = ClaudeAgentOptions(
tools=[],
max_turns=1,
)
async for message in query(
prompt="What tools do you have available? Just list them briefly.",
options=options,
):
if isinstance(message, SystemMessage) and message.subtype == "init":
tools = message.data.get("tools", [])
print(f"Tools from system message: {tools}")
print()
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def tools_preset_example():
"""Example with tools preset (all default Claude Code tools)."""
print("=== Tools Preset Example ===")
print("Setting tools={'type': 'preset', 'preset': 'claude_code'}")
print()
options = ClaudeAgentOptions(
tools={"type": "preset", "preset": "claude_code"},
max_turns=1,
)
async for message in query(
prompt="What tools do you have available? Just list them briefly.",
options=options,
):
if isinstance(message, SystemMessage) and message.subtype == "init":
tools = message.data.get("tools", [])
print(f"Tools from system message ({len(tools)} tools): {tools[:5]}...")
print()
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def main():
"""Run all examples."""
await tools_array_example()
await tools_empty_array_example()
await tools_preset_example()
if __name__ == "__main__":
anyio.run(main)

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "claude-agent-sdk"
version = "0.1.18"
version = "0.1.8"
description = "Python SDK for Claude Code"
readme = "README.md"
requires-python = ">=3.10"

View file

@ -1,77 +0,0 @@
#!/bin/bash
# Run SDK tests in a Docker container
# This helps catch Docker-specific issues like #406
#
# Usage:
# ./scripts/test-docker.sh [unit|e2e|all]
#
# Examples:
# ./scripts/test-docker.sh unit # Run unit tests only
# ANTHROPIC_API_KEY=sk-... ./scripts/test-docker.sh e2e # Run e2e tests
# ANTHROPIC_API_KEY=sk-... ./scripts/test-docker.sh all # Run all tests
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
cd "$PROJECT_DIR"
usage() {
echo "Usage: $0 [unit|e2e|all]"
echo ""
echo "Commands:"
echo " unit - Run unit tests only (no API key needed)"
echo " e2e - Run e2e tests (requires ANTHROPIC_API_KEY)"
echo " all - Run both unit and e2e tests"
echo ""
echo "Examples:"
echo " $0 unit"
echo " ANTHROPIC_API_KEY=sk-... $0 e2e"
exit 1
}
echo "Building Docker test image..."
docker build -f Dockerfile.test -t claude-sdk-test .
case "${1:-unit}" in
unit)
echo ""
echo "Running unit tests in Docker..."
docker run --rm claude-sdk-test \
python -m pytest tests/ -v
;;
e2e)
if [ -z "$ANTHROPIC_API_KEY" ]; then
echo "Error: ANTHROPIC_API_KEY environment variable is required for e2e tests"
echo ""
echo "Usage: ANTHROPIC_API_KEY=sk-... $0 e2e"
exit 1
fi
echo ""
echo "Running e2e tests in Docker..."
docker run --rm -e ANTHROPIC_API_KEY \
claude-sdk-test python -m pytest e2e-tests/ -v -m e2e
;;
all)
echo ""
echo "Running unit tests in Docker..."
docker run --rm claude-sdk-test \
python -m pytest tests/ -v
echo ""
if [ -n "$ANTHROPIC_API_KEY" ]; then
echo "Running e2e tests in Docker..."
docker run --rm -e ANTHROPIC_API_KEY \
claude-sdk-test python -m pytest e2e-tests/ -v -m e2e
else
echo "Skipping e2e tests (ANTHROPIC_API_KEY not set)"
fi
;;
*)
usage
;;
esac
echo ""
echo "Done!"

View file

@ -39,10 +39,6 @@ from .types import (
PreCompactHookInput,
PreToolUseHookInput,
ResultMessage,
SandboxIgnoreViolations,
SandboxNetworkConfig,
SandboxSettings,
SdkBeta,
SdkPluginConfig,
SettingSource,
StopHookInput,
@ -219,7 +215,7 @@ def create_sdk_mcp_server(
tool_map = {tool_def.name: tool_def for tool_def in tools}
# Register list_tools handler to expose available tools
@server.list_tools() # type: ignore[no-untyped-call,untyped-decorator]
@server.list_tools() # type: ignore[no-untyped-call,misc]
async def list_tools() -> list[Tool]:
"""Return the list of available tools."""
tool_list = []
@ -265,7 +261,7 @@ def create_sdk_mcp_server(
return tool_list
# Register call_tool handler to execute tools
@server.call_tool() # type: ignore[untyped-decorator]
@server.call_tool() # type: ignore[misc]
async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"""Execute a tool by name with given arguments."""
if name not in tool_map:
@ -346,12 +342,6 @@ __all__ = [
"SettingSource",
# Plugin support
"SdkPluginConfig",
# Beta support
"SdkBeta",
# Sandbox support
"SandboxSettings",
"SandboxNetworkConfig",
"SandboxIgnoreViolations",
# MCP Server Support
"create_sdk_mcp_server",
"tool",

View file

@ -1,3 +1,3 @@
"""Bundled Claude Code CLI version."""
__cli_version__ = "2.0.74"
__cli_version__ = "2.0.45"

View file

@ -31,12 +31,10 @@ class InternalClient:
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher: dict[str, Any] = {
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
if hasattr(matcher, "timeout") and matcher.timeout is not None:
internal_matcher["timeout"] = matcher.timeout
internal_hooks[event].append(internal_matcher)
return internal_hooks

View file

@ -48,7 +48,6 @@ def parse_message(data: dict[str, Any]) -> Message:
case "user":
try:
parent_tool_use_id = data.get("parent_tool_use_id")
uuid = data.get("uuid")
if isinstance(data["message"]["content"], list):
user_content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
@ -75,12 +74,10 @@ def parse_message(data: dict[str, Any]) -> Message:
)
return UserMessage(
content=user_content_blocks,
uuid=uuid,
parent_tool_use_id=parent_tool_use_id,
)
return UserMessage(
content=data["message"]["content"],
uuid=uuid,
parent_tool_use_id=parent_tool_use_id,
)
except KeyError as e:
@ -123,7 +120,6 @@ def parse_message(data: dict[str, Any]) -> Message:
content=content_blocks,
model=data["message"]["model"],
parent_tool_use_id=data.get("parent_tool_use_id"),
error=data["message"].get("error"),
)
except KeyError as e:
raise MessageParseError(

View file

@ -72,7 +72,6 @@ class Query:
| None = None,
hooks: dict[str, list[dict[str, Any]]] | None = None,
sdk_mcp_servers: dict[str, "McpServer"] | None = None,
initialize_timeout: float = 60.0,
):
"""Initialize Query with transport and callbacks.
@ -82,9 +81,7 @@ class Query:
can_use_tool: Optional callback for tool permission requests
hooks: Optional hook configurations
sdk_mcp_servers: Optional SDK MCP server instances
initialize_timeout: Timeout in seconds for the initialize request
"""
self._initialize_timeout = initialize_timeout
self.transport = transport
self.is_streaming_mode = is_streaming_mode
self.can_use_tool = can_use_tool
@ -107,12 +104,6 @@ class Query:
self._closed = False
self._initialization_result: dict[str, Any] | None = None
# Track first result for proper stream closure with SDK MCP servers
self._first_result_event = anyio.Event()
self._stream_close_timeout = (
float(os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")) / 1000.0
) # Convert ms to seconds
async def initialize(self) -> dict[str, Any] | None:
"""Initialize control protocol if in streaming mode.
@ -135,13 +126,12 @@ class Query:
self.next_callback_id += 1
self.hook_callbacks[callback_id] = callback
callback_ids.append(callback_id)
hook_matcher_config: dict[str, Any] = {
"matcher": matcher.get("matcher"),
"hookCallbackIds": callback_ids,
}
if matcher.get("timeout") is not None:
hook_matcher_config["timeout"] = matcher.get("timeout")
hooks_config[event].append(hook_matcher_config)
hooks_config[event].append(
{
"matcher": matcher.get("matcher"),
"hookCallbackIds": callback_ids,
}
)
# Send initialize request
request = {
@ -149,10 +139,7 @@ class Query:
"hooks": hooks_config if hooks_config else None,
}
# Use longer timeout for initialize since MCP servers may take time to start
response = await self._send_control_request(
request, timeout=self._initialize_timeout
)
response = await self._send_control_request(request)
self._initialized = True
self._initialization_result = response # Store for later access
return response
@ -201,10 +188,6 @@ class Query:
# TODO: Implement cancellation support
continue
# Track results for proper stream closure
if msg_type == "result":
self._first_result_event.set()
# Regular SDK messages go to the stream
await self._message_send.send(message)
@ -214,11 +197,6 @@ class Query:
raise # Re-raise to properly handle cancellation
except Exception as e:
logger.error(f"Fatal error in message reader: {e}")
# Signal all pending control requests so they fail fast instead of timing out
for request_id, event in list(self.pending_control_responses.items()):
if request_id not in self.pending_control_results:
self.pending_control_results[request_id] = e
event.set()
# Put error in stream so iterators can handle it
await self._message_send.send({"type": "error", "error": str(e)})
finally:
@ -336,15 +314,8 @@ class Query:
}
await self.transport.write(json.dumps(error_response) + "\n")
async def _send_control_request(
self, request: dict[str, Any], timeout: float = 60.0
) -> dict[str, Any]:
"""Send control request to CLI and wait for response.
Args:
request: The control request to send
timeout: Timeout in seconds to wait for response (default 60s)
"""
async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]:
"""Send control request to CLI and wait for response."""
if not self.is_streaming_mode:
raise Exception("Control requests require streaming mode")
@ -367,7 +338,7 @@ class Query:
# Wait for response
try:
with anyio.fail_after(timeout):
with anyio.fail_after(60.0):
await event.wait()
result = self.pending_control_results.pop(request_id)
@ -539,51 +510,14 @@ class Query:
}
)
async def rewind_files(self, user_message_id: str) -> None:
"""Rewind tracked files to their state at a specific user message.
Requires file checkpointing to be enabled via the `enable_file_checkpointing` option.
Args:
user_message_id: UUID of the user message to rewind to
"""
await self._send_control_request(
{
"subtype": "rewind_files",
"user_message_id": user_message_id,
}
)
async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None:
"""Stream input messages to transport.
If SDK MCP servers or hooks are present, waits for the first result
before closing stdin to allow bidirectional control protocol communication.
"""
"""Stream input messages to transport."""
try:
async for message in stream:
if self._closed:
break
await self.transport.write(json.dumps(message) + "\n")
# If we have SDK MCP servers or hooks that need bidirectional communication,
# wait for first result before closing the channel
has_hooks = bool(self.hooks)
if self.sdk_mcp_servers or has_hooks:
logger.debug(
f"Waiting for first result before closing stdin "
f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})"
)
try:
with anyio.move_on_after(self._stream_close_timeout):
await self._first_result_event.wait()
logger.debug("Received first result, closing input stream")
except Exception:
logger.debug(
"Timed out waiting for first result, closing input stream"
)
# After all messages sent (and result received if needed), end input
# After all messages sent, end input
await self.transport.end_input()
except Exception as e:
logger.debug(f"Error streaming input: {e}")

View file

@ -65,7 +65,6 @@ class SubprocessCLITransport(Transport):
else _DEFAULT_MAX_BUFFER_SIZE
)
self._temp_files: list[str] = [] # Track temporary files for cleanup
self._write_lock: anyio.Lock = anyio.Lock()
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
@ -115,60 +114,6 @@ class SubprocessCLITransport(Transport):
return None
def _build_settings_value(self) -> str | None:
"""Build settings value, merging sandbox settings if provided.
Returns the settings value as either:
- A JSON string (if sandbox is provided or settings is JSON)
- A file path (if only settings path is provided without sandbox)
- None if neither settings nor sandbox is provided
"""
has_settings = self._options.settings is not None
has_sandbox = self._options.sandbox is not None
if not has_settings and not has_sandbox:
return None
# If only settings path and no sandbox, pass through as-is
if has_settings and not has_sandbox:
return self._options.settings
# If we have sandbox settings, we need to merge into a JSON object
settings_obj: dict[str, Any] = {}
if has_settings:
assert self._options.settings is not None
settings_str = self._options.settings.strip()
# Check if settings is a JSON string or a file path
if settings_str.startswith("{") and settings_str.endswith("}"):
# Parse JSON string
try:
settings_obj = json.loads(settings_str)
except json.JSONDecodeError:
# If parsing fails, treat as file path
logger.warning(
f"Failed to parse settings as JSON, treating as file path: {settings_str}"
)
# Read the file
settings_path = Path(settings_str)
if settings_path.exists():
with settings_path.open(encoding="utf-8") as f:
settings_obj = json.load(f)
else:
# It's a file path - read and parse
settings_path = Path(settings_str)
if settings_path.exists():
with settings_path.open(encoding="utf-8") as f:
settings_obj = json.load(f)
else:
logger.warning(f"Settings file not found: {settings_path}")
# Merge sandbox settings
if has_sandbox:
settings_obj["sandbox"] = self._options.sandbox
return json.dumps(settings_obj)
def _build_command(self) -> list[str]:
"""Build CLI command with arguments."""
cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"]
@ -186,18 +131,6 @@ class SubprocessCLITransport(Transport):
["--append-system-prompt", self._options.system_prompt["append"]]
)
# Handle tools option (base set of tools)
if self._options.tools is not None:
tools = self._options.tools
if isinstance(tools, list):
if len(tools) == 0:
cmd.extend(["--tools", ""])
else:
cmd.extend(["--tools", ",".join(tools)])
else:
# Preset object - 'claude_code' preset maps to 'default'
cmd.extend(["--tools", "default"])
if self._options.allowed_tools:
cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)])
@ -216,9 +149,6 @@ class SubprocessCLITransport(Transport):
if self._options.fallback_model:
cmd.extend(["--fallback-model", self._options.fallback_model])
if self._options.betas:
cmd.extend(["--betas", ",".join(self._options.betas)])
if self._options.permission_prompt_tool_name:
cmd.extend(
["--permission-prompt-tool", self._options.permission_prompt_tool_name]
@ -233,10 +163,8 @@ class SubprocessCLITransport(Transport):
if self._options.resume:
cmd.extend(["--resume", self._options.resume])
# Handle settings and sandbox: merge sandbox into settings if both are provided
settings_value = self._build_settings_value()
if settings_value:
cmd.extend(["--settings", settings_value])
if self._options.settings:
cmd.extend(["--settings", self._options.settings])
if self._options.add_dirs:
# Convert all paths to strings and add each directory
@ -384,10 +312,6 @@ class SubprocessCLITransport(Transport):
"CLAUDE_AGENT_SDK_VERSION": __version__,
}
# Enable file checkpointing if requested
if self._options.enable_file_checkpointing:
process_env["CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING"] = "true"
if self._cwd:
process_env["PWD"] = self._cwd
@ -476,6 +400,8 @@ class SubprocessCLITransport(Transport):
async def close(self) -> None:
"""Close the transport and clean up resources."""
self._ready = False
# Clean up temporary files first (before early return)
for temp_file in self._temp_files:
with suppress(Exception):
@ -483,7 +409,6 @@ class SubprocessCLITransport(Transport):
self._temp_files.clear()
if not self._process:
self._ready = False
return
# Close stderr task group if active
@ -493,19 +418,21 @@ class SubprocessCLITransport(Transport):
await self._stderr_task_group.__aexit__(None, None, None)
self._stderr_task_group = None
# Close stdin stream (acquire lock to prevent race with concurrent writes)
async with self._write_lock:
self._ready = False # Set inside lock to prevent TOCTOU with write()
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
# Close streams
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
if self._stderr_stream:
with suppress(Exception):
await self._stderr_stream.aclose()
self._stderr_stream = None
if self._process.stdin:
with suppress(Exception):
await self._process.stdin.aclose()
# Terminate and wait for process
if self._process.returncode is None:
with suppress(ProcessLookupError):
@ -523,37 +450,37 @@ class SubprocessCLITransport(Transport):
async def write(self, data: str) -> None:
"""Write raw data to the transport."""
async with self._write_lock:
# All checks inside lock to prevent TOCTOU races with close()/end_input()
if not self._ready or not self._stdin_stream:
raise CLIConnectionError("ProcessTransport is not ready for writing")
# Check if ready (like TypeScript)
if not self._ready or not self._stdin_stream:
raise CLIConnectionError("ProcessTransport is not ready for writing")
if self._process and self._process.returncode is not None:
raise CLIConnectionError(
f"Cannot write to terminated process (exit code: {self._process.returncode})"
)
# Check if process is still alive (like TypeScript)
if self._process and self._process.returncode is not None:
raise CLIConnectionError(
f"Cannot write to terminated process (exit code: {self._process.returncode})"
)
if self._exit_error:
raise CLIConnectionError(
f"Cannot write to process that exited with error: {self._exit_error}"
) from self._exit_error
# Check for exit errors (like TypeScript)
if self._exit_error:
raise CLIConnectionError(
f"Cannot write to process that exited with error: {self._exit_error}"
) from self._exit_error
try:
await self._stdin_stream.send(data)
except Exception as e:
self._ready = False
self._exit_error = CLIConnectionError(
f"Failed to write to process stdin: {e}"
)
raise self._exit_error from e
try:
await self._stdin_stream.send(data)
except Exception as e:
self._ready = False # Mark as not ready (like TypeScript)
self._exit_error = CLIConnectionError(
f"Failed to write to process stdin: {e}"
)
raise self._exit_error from e
async def end_input(self) -> None:
"""End the input stream (close stdin)."""
async with self._write_lock:
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Read and parse messages from the transport."""

View file

@ -1,3 +1,3 @@
"""Version information for claude-agent-sdk."""
__version__ = "0.1.18"
__version__ = "0.1.8"

View file

@ -75,12 +75,10 @@ class ClaudeSDKClient:
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher: dict[str, Any] = {
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
if hasattr(matcher, "timeout") and matcher.timeout is not None:
internal_matcher["timeout"] = matcher.timeout
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -140,13 +138,6 @@ class ClaudeSDKClient:
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
# Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set
# CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds
initialize_timeout_ms = int(
os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")
)
initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0)
# Create Query to handle control protocol
self._query = Query(
transport=self._transport,
@ -156,7 +147,6 @@ class ClaudeSDKClient:
if self.options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
initialize_timeout=initialize_timeout,
)
# Start reading messages and initialize
@ -261,38 +251,6 @@ class ClaudeSDKClient:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.set_model(model)
async def rewind_files(self, user_message_id: str) -> None:
"""Rewind tracked files to their state at a specific user message.
Requires:
- `enable_file_checkpointing=True` to track file changes
- `extra_args={"replay-user-messages": None}` to receive UserMessage
objects with `uuid` in the response stream
Args:
user_message_id: UUID of the user message to rewind to. This should be
the `uuid` field from a `UserMessage` received during the conversation.
Example:
```python
options = ClaudeAgentOptions(
enable_file_checkpointing=True,
extra_args={"replay-user-messages": None},
)
async with ClaudeSDKClient(options) as client:
await client.query("Make some changes to my files")
async for msg in client.receive_response():
if isinstance(msg, UserMessage) and msg.uuid:
checkpoint_id = msg.uuid # Save this for later
# Later, rewind to that point
await client.rewind_files(checkpoint_id)
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.rewind_files(user_message_id)
async def get_server_info(self) -> dict[str, Any] | None:
"""Get server initialization info including available commands and output styles.

View file

@ -10,16 +10,10 @@ from typing_extensions import NotRequired
if TYPE_CHECKING:
from mcp.server import Server as McpServer
else:
# Runtime placeholder for forward reference resolution in Pydantic 2.12+
McpServer = Any
# Permission modes
PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"]
# SDK Beta features - see https://docs.anthropic.com/en/api/beta-headers
SdkBeta = Literal["context-1m-2025-08-07"]
# Agent definitions
SettingSource = Literal["user", "project", "local"]
@ -32,13 +26,6 @@ class SystemPromptPreset(TypedDict):
append: NotRequired[str]
class ToolsPreset(TypedDict):
"""Tools preset configuration."""
type: Literal["preset"]
preset: Literal["claude_code"]
@dataclass
class AgentDefinition:
"""Agent definition configuration."""
@ -379,9 +366,6 @@ class HookMatcher:
# A list of Python functions with function signature HookCallback
hooks: list[HookCallback] = field(default_factory=list)
# Timeout in seconds for all hooks in this matcher (default: 60)
timeout: float | None = None
# MCP Server config
class McpStdioServerConfig(TypedDict):
@ -432,83 +416,6 @@ class SdkPluginConfig(TypedDict):
path: str
# Sandbox configuration types
class SandboxNetworkConfig(TypedDict, total=False):
"""Network configuration for sandbox.
Attributes:
allowUnixSockets: Unix socket paths accessible in sandbox (e.g., SSH agents).
allowAllUnixSockets: Allow all Unix sockets (less secure).
allowLocalBinding: Allow binding to localhost ports (macOS only).
httpProxyPort: HTTP proxy port if bringing your own proxy.
socksProxyPort: SOCKS5 proxy port if bringing your own proxy.
"""
allowUnixSockets: list[str]
allowAllUnixSockets: bool
allowLocalBinding: bool
httpProxyPort: int
socksProxyPort: int
class SandboxIgnoreViolations(TypedDict, total=False):
"""Violations to ignore in sandbox.
Attributes:
file: File paths for which violations should be ignored.
network: Network hosts for which violations should be ignored.
"""
file: list[str]
network: list[str]
class SandboxSettings(TypedDict, total=False):
"""Sandbox settings configuration.
This controls how Claude Code sandboxes bash commands for filesystem
and network isolation.
**Important:** Filesystem and network restrictions are configured via permission
rules, not via these sandbox settings:
- Filesystem read restrictions: Use Read deny rules
- Filesystem write restrictions: Use Edit allow/deny rules
- Network restrictions: Use WebFetch allow/deny rules
Attributes:
enabled: Enable bash sandboxing (macOS/Linux only). Default: False
autoAllowBashIfSandboxed: Auto-approve bash commands when sandboxed. Default: True
excludedCommands: Commands that should run outside the sandbox (e.g., ["git", "docker"])
allowUnsandboxedCommands: Allow commands to bypass sandbox via dangerouslyDisableSandbox.
When False, all commands must run sandboxed (or be in excludedCommands). Default: True
network: Network configuration for sandbox.
ignoreViolations: Violations to ignore.
enableWeakerNestedSandbox: Enable weaker sandbox for unprivileged Docker environments
(Linux only). Reduces security. Default: False
Example:
```python
sandbox_settings: SandboxSettings = {
"enabled": True,
"autoAllowBashIfSandboxed": True,
"excludedCommands": ["docker"],
"network": {
"allowUnixSockets": ["/var/run/docker.sock"],
"allowLocalBinding": True
}
}
```
"""
enabled: bool
autoAllowBashIfSandboxed: bool
excludedCommands: list[str]
allowUnsandboxedCommands: bool
network: SandboxNetworkConfig
ignoreViolations: SandboxIgnoreViolations
enableWeakerNestedSandbox: bool
# Content block types
@dataclass
class TextBlock:
@ -547,22 +454,11 @@ ContentBlock = TextBlock | ThinkingBlock | ToolUseBlock | ToolResultBlock
# Message types
AssistantMessageError = Literal[
"authentication_failed",
"billing_error",
"rate_limit",
"invalid_request",
"server_error",
"unknown",
]
@dataclass
class UserMessage:
"""User message."""
content: str | list[ContentBlock]
uuid: str | None = None
parent_tool_use_id: str | None = None
@ -573,7 +469,6 @@ class AssistantMessage:
content: list[ContentBlock]
model: str
parent_tool_use_id: str | None = None
error: AssistantMessageError | None = None
@dataclass
@ -617,7 +512,6 @@ Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage | Strea
class ClaudeAgentOptions:
"""Query options for Claude SDK."""
tools: list[str] | ToolsPreset | None = None
allowed_tools: list[str] = field(default_factory=list)
system_prompt: str | SystemPromptPreset | None = None
mcp_servers: dict[str, McpServerConfig] | str | Path = field(default_factory=dict)
@ -629,8 +523,6 @@ class ClaudeAgentOptions:
disallowed_tools: list[str] = field(default_factory=list)
model: str | None = None
fallback_model: str | None = None
# Beta features - see https://docs.anthropic.com/en/api/beta-headers
betas: list[SdkBeta] = field(default_factory=list)
permission_prompt_tool_name: str | None = None
cwd: str | Path | None = None
cli_path: str | Path | None = None
@ -663,10 +555,6 @@ class ClaudeAgentOptions:
agents: dict[str, AgentDefinition] | None = None
# Setting sources to load (user, project, local)
setting_sources: list[SettingSource] | None = None
# Sandbox configuration for bash command isolation.
# Filesystem and network restrictions are derived from permission rules (Read/Edit/WebFetch),
# not from these sandbox settings.
sandbox: SandboxSettings | None = None
# Plugin configurations for custom plugins
plugins: list[SdkPluginConfig] = field(default_factory=list)
# Max tokens for thinking blocks
@ -674,10 +562,6 @@ class ClaudeAgentOptions:
# Output format for structured outputs (matches Messages API structure)
# Example: {"type": "json_schema", "schema": {"type": "object", "properties": {...}}}
output_format: dict[str, Any] | None = None
# Enable file checkpointing to track file changes during the session.
# When enabled, files can be rewound to their state at any user message
# using `ClaudeSDKClient.rewind_files()`.
enable_file_checkpointing: bool = False
# SDK Control Protocol
@ -718,11 +602,6 @@ class SDKControlMcpMessageRequest(TypedDict):
message: Any
class SDKControlRewindFilesRequest(TypedDict):
subtype: Literal["rewind_files"]
user_message_id: str
class SDKControlRequest(TypedDict):
type: Literal["control_request"]
request_id: str
@ -733,7 +612,6 @@ class SDKControlRequest(TypedDict):
| SDKControlSetPermissionModeRequest
| SDKHookCallbackRequest
| SDKControlMcpMessageRequest
| SDKControlRewindFilesRequest
)

View file

@ -31,21 +31,6 @@ class TestMessageParser:
assert isinstance(message.content[0], TextBlock)
assert message.content[0].text == "Hello"
def test_parse_user_message_with_uuid(self):
"""Test parsing a user message with uuid field (issue #414).
The uuid field is needed for file checkpointing with rewind_files().
"""
data = {
"type": "user",
"uuid": "msg-abc123-def456",
"message": {"content": [{"type": "text", "text": "Hello"}]},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert message.uuid == "msg-abc123-def456"
assert len(message.content) == 1
def test_parse_user_message_with_tool_use(self):
"""Test parsing a user message with tool_use block."""
data = {

View file

@ -500,329 +500,3 @@ class TestSubprocessCLITransport:
assert user_passed == "claude"
anyio.run(_test)
def test_build_command_with_sandbox_only(self):
"""Test building CLI command with sandbox settings (no existing settings)."""
import json
from claude_agent_sdk import SandboxSettings
sandbox: SandboxSettings = {
"enabled": True,
"autoAllowBashIfSandboxed": True,
"network": {
"allowLocalBinding": True,
"allowUnixSockets": ["/var/run/docker.sock"],
},
}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(sandbox=sandbox),
)
cmd = transport._build_command()
# Should have --settings with sandbox merged in
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
# Parse and verify
parsed = json.loads(settings_value)
assert "sandbox" in parsed
assert parsed["sandbox"]["enabled"] is True
assert parsed["sandbox"]["autoAllowBashIfSandboxed"] is True
assert parsed["sandbox"]["network"]["allowLocalBinding"] is True
assert parsed["sandbox"]["network"]["allowUnixSockets"] == [
"/var/run/docker.sock"
]
def test_build_command_with_sandbox_and_settings_json(self):
"""Test building CLI command with sandbox merged into existing settings JSON."""
import json
from claude_agent_sdk import SandboxSettings
# Existing settings as JSON string
existing_settings = (
'{"permissions": {"allow": ["Bash(ls:*)"]}, "verbose": true}'
)
sandbox: SandboxSettings = {
"enabled": True,
"excludedCommands": ["git", "docker"],
}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(settings=existing_settings, sandbox=sandbox),
)
cmd = transport._build_command()
# Should have merged settings
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
parsed = json.loads(settings_value)
# Original settings should be preserved
assert parsed["permissions"] == {"allow": ["Bash(ls:*)"]}
assert parsed["verbose"] is True
# Sandbox should be merged in
assert "sandbox" in parsed
assert parsed["sandbox"]["enabled"] is True
assert parsed["sandbox"]["excludedCommands"] == ["git", "docker"]
def test_build_command_with_settings_file_and_no_sandbox(self):
"""Test that settings file path is passed through when no sandbox."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(settings="/path/to/settings.json"),
)
cmd = transport._build_command()
# Should pass path directly, not parse it
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
assert cmd[settings_idx + 1] == "/path/to/settings.json"
def test_build_command_sandbox_minimal(self):
"""Test sandbox with minimal configuration."""
import json
from claude_agent_sdk import SandboxSettings
sandbox: SandboxSettings = {"enabled": True}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(sandbox=sandbox),
)
cmd = transport._build_command()
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
parsed = json.loads(settings_value)
assert parsed == {"sandbox": {"enabled": True}}
def test_sandbox_network_config(self):
"""Test sandbox with full network configuration."""
import json
from claude_agent_sdk import SandboxSettings
sandbox: SandboxSettings = {
"enabled": True,
"network": {
"allowUnixSockets": ["/tmp/ssh-agent.sock"],
"allowAllUnixSockets": False,
"allowLocalBinding": True,
"httpProxyPort": 8080,
"socksProxyPort": 8081,
},
}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(sandbox=sandbox),
)
cmd = transport._build_command()
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
parsed = json.loads(settings_value)
network = parsed["sandbox"]["network"]
assert network["allowUnixSockets"] == ["/tmp/ssh-agent.sock"]
assert network["allowAllUnixSockets"] is False
assert network["allowLocalBinding"] is True
assert network["httpProxyPort"] == 8080
assert network["socksProxyPort"] == 8081
def test_build_command_with_tools_array(self):
"""Test building CLI command with tools as array of tool names."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(tools=["Read", "Edit", "Bash"]),
)
cmd = transport._build_command()
assert "--tools" in cmd
tools_idx = cmd.index("--tools")
assert cmd[tools_idx + 1] == "Read,Edit,Bash"
def test_build_command_with_tools_empty_array(self):
"""Test building CLI command with tools as empty array (disables all tools)."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(tools=[]),
)
cmd = transport._build_command()
assert "--tools" in cmd
tools_idx = cmd.index("--tools")
assert cmd[tools_idx + 1] == ""
def test_build_command_with_tools_preset(self):
"""Test building CLI command with tools preset."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(tools={"type": "preset", "preset": "claude_code"}),
)
cmd = transport._build_command()
assert "--tools" in cmd
tools_idx = cmd.index("--tools")
assert cmd[tools_idx + 1] == "default"
def test_build_command_without_tools(self):
"""Test building CLI command without tools option (default None)."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(),
)
cmd = transport._build_command()
assert "--tools" not in cmd
def test_concurrent_writes_are_serialized(self):
"""Test that concurrent write() calls are serialized by the lock.
When parallel subagents invoke MCP tools, they trigger concurrent write()
calls. Without the _write_lock, trio raises BusyResourceError.
Uses a real subprocess with the same stream setup as production:
process.stdin -> TextSendStream
"""
async def _test():
import sys
from subprocess import PIPE
from anyio.streams.text import TextSendStream
# Create a real subprocess that consumes stdin (cross-platform)
process = await anyio.open_process(
[sys.executable, "-c", "import sys; sys.stdin.read()"],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
)
try:
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(cli_path="/usr/bin/claude"),
)
# Same setup as production: TextSendStream wrapping process.stdin
transport._ready = True
transport._process = MagicMock(returncode=None)
transport._stdin_stream = TextSendStream(process.stdin)
# Spawn concurrent writes - the lock should serialize them
num_writes = 10
errors: list[Exception] = []
async def do_write(i: int):
try:
await transport.write(f'{{"msg": {i}}}\n')
except Exception as e:
errors.append(e)
async with anyio.create_task_group() as tg:
for i in range(num_writes):
tg.start_soon(do_write, i)
# All writes should succeed - the lock serializes them
assert len(errors) == 0, f"Got errors: {errors}"
finally:
process.terminate()
await process.wait()
anyio.run(_test, backend="trio")
def test_concurrent_writes_fail_without_lock(self):
"""Verify that without the lock, concurrent writes cause BusyResourceError.
Uses a real subprocess with the same stream setup as production.
"""
async def _test():
import sys
from contextlib import asynccontextmanager
from subprocess import PIPE
from anyio.streams.text import TextSendStream
# Create a real subprocess that consumes stdin (cross-platform)
process = await anyio.open_process(
[sys.executable, "-c", "import sys; sys.stdin.read()"],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
)
try:
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(cli_path="/usr/bin/claude"),
)
# Same setup as production
transport._ready = True
transport._process = MagicMock(returncode=None)
transport._stdin_stream = TextSendStream(process.stdin)
# Replace lock with no-op to trigger the race condition
class NoOpLock:
@asynccontextmanager
async def __call__(self):
yield
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
transport._write_lock = NoOpLock()
# Spawn concurrent writes - should fail without lock
num_writes = 10
errors: list[Exception] = []
async def do_write(i: int):
try:
await transport.write(f'{{"msg": {i}}}\n')
except Exception as e:
errors.append(e)
async with anyio.create_task_group() as tg:
for i in range(num_writes):
tg.start_soon(do_write, i)
# Should have gotten errors due to concurrent access
assert len(errors) > 0, (
"Expected errors from concurrent access, but got none"
)
# Check that at least one error mentions the concurrent access
error_strs = [str(e) for e in errors]
assert any("another task" in s for s in error_strs), (
f"Expected 'another task' error, got: {error_strs}"
)
finally:
process.terminate()
await process.wait()
anyio.run(_test, backend="trio")