Compare commits

..

No commits in common. "main" and "v0.1.1" have entirely different histories.
main ... v0.1.1

46 changed files with 318 additions and 3908 deletions

View file

@ -1,9 +0,0 @@
---
name: test-agent
description: A simple test agent for SDK testing
tools: Read
---
# Test Agent
You are a simple test agent. When asked a question, provide a brief, helpful answer.

View file

@ -1,19 +0,0 @@
---
allowed-tools: Edit, Bash(git add:*), Bash(git commit:*)
description: Generate changelog for a new release version
---
You are updating the changelog for the new release.
Update CHANGELOG.md to add a new section for the new version at the top of the file, right after the '# Changelog' heading.
Review the recent commits and merged pull requests since the last release to generate meaningful changelog content for the new version. Follow the existing format in CHANGELOG.md with sections like:
- Breaking Changes (if any)
- New Features
- Bug Fixes
- Documentation
- Internal/Other changes
Include only the sections that are relevant based on the actual changes. Write clear, user-focused descriptions.
After updating CHANGELOG.md, commit the changes with the message "docs: update changelog for v{new_version}".

View file

@ -1,49 +0,0 @@
# Git
.git
.gitignore
# Python
__pycache__
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.env
.venv
env/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Testing/Coverage
.coverage
.pytest_cache/
htmlcov/
.tox/
.nox/
# Misc
*.log
.DS_Store

View file

@ -24,6 +24,12 @@ jobs:
VERSION="${BRANCH_NAME#release/v}"
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Get previous release tag
id: previous_tag
run: |
PREVIOUS_TAG=$(git describe --tags --abbrev=0 HEAD~1 2>/dev/null || echo "")
echo "previous_tag=$PREVIOUS_TAG" >> $GITHUB_OUTPUT
- name: Create and push tag
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
@ -40,34 +46,14 @@ jobs:
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
VERSION="${{ steps.extract_version.outputs.version }}"
# Create release with auto-generated notes
gh release create "v${{ steps.extract_version.outputs.version }}" \
--title "Release v${{ steps.extract_version.outputs.version }}" \
--generate-notes \
--notes-start-tag "${{ steps.previous_tag.outputs.previous_tag }}" \
--notes "Published to PyPI: https://pypi.org/project/claude-agent-sdk/${{ steps.extract_version.outputs.version }}/
# Extract changelog section for this version to a temp file
awk -v ver="$VERSION" '
/^## / {
if (found) exit
if ($2 == ver) found=1
next
}
found { print }
' CHANGELOG.md > release_notes.md
# Append install instructions
{
echo ""
echo "---"
echo ""
echo "**PyPI:** https://pypi.org/project/claude-agent-sdk/VERSION/"
echo ""
echo '```bash'
echo "pip install claude-agent-sdk==VERSION"
echo '```'
} >> release_notes.md
# Replace VERSION placeholder
sed -i "s/VERSION/$VERSION/g" release_notes.md
# Create release with notes from file
gh release create "v$VERSION" \
--title "v$VERSION" \
--notes-file release_notes.md
### Installation
\`\`\`bash
pip install claude-agent-sdk==${{ steps.extract_version.outputs.version }}
\`\`\`"

View file

@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
version:
description: 'Package version to publish (e.g., 0.1.4)'
description: 'Version to publish (e.g., 0.1.0)'
required: true
type: string
jobs:
@ -12,141 +12,89 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
python-version: ['3.10', '3.11', '3.12', '3.13']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[dev]"
- name: Run tests
run: |
python -m pytest tests/ -v
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[dev]"
- name: Run tests
run: |
python -m pytest tests/ -v
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[dev]"
- name: Run ruff
run: |
ruff check src/ tests/
ruff format --check src/ tests/
- name: Run mypy
run: |
mypy src/
build-wheels:
needs: [test, lint]
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, ubuntu-24.04-arm, macos-latest, windows-latest]
permissions:
contents: write
pull-requests: write
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install build dependencies
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build twine wheel
shell: bash
- name: Build wheel with bundled CLI
pip install -e ".[dev]"
- name: Run ruff
run: |
python scripts/build_wheel.py \
--version "${{ github.event.inputs.version }}" \
--skip-sdist \
--clean
shell: bash
- name: Upload wheel artifact
uses: actions/upload-artifact@v4
with:
name: wheel-${{ matrix.os }}
path: dist/*.whl
if-no-files-found: error
compression-level: 0
ruff check src/ tests/
ruff format --check src/ tests/
- name: Run mypy
run: |
mypy src/
publish:
needs: [build-wheels]
needs: [test, lint]
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- uses: actions/checkout@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
fetch-depth: 0 # Fetch all history including tags for changelog generation
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Set version
id: version
run: |
VERSION="${{ github.event.inputs.version }}"
echo "VERSION=$VERSION" >> $GITHUB_ENV
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Update version
run: |
python scripts/update_version.py "${{ env.VERSION }}"
- name: Read CLI version from code
id: cli_version
run: |
CLI_VERSION=$(python -c "import re; print(re.search(r'__cli_version__ = \"([^\"]+)\"', open('src/claude_agent_sdk/_cli_version.py').read()).group(1))")
echo "cli_version=$CLI_VERSION" >> $GITHUB_OUTPUT
echo "Bundled CLI version: $CLI_VERSION"
- name: Download all wheel artifacts
uses: actions/download-artifact@v4
with:
path: dist
pattern: wheel-*
merge-multiple: true
- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build twine
- name: Build source distribution
run: python -m build --sdist
- name: Build package
run: python -m build
- name: Check package
run: twine check dist/*
- name: Publish to PyPI
env:
TWINE_USERNAME: __token__
@ -156,67 +104,66 @@ jobs:
echo "Package published to PyPI"
echo "Install with: pip install claude-agent-sdk==${{ env.VERSION }}"
- name: Get previous release tag
id: previous_tag
run: |
PREVIOUS_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "")
echo "previous_tag=$PREVIOUS_TAG" >> $GITHUB_OUTPUT
echo "Previous release: $PREVIOUS_TAG"
- name: Create release branch and commit version changes
- name: Create version update PR
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Create a new branch for the version update
BRANCH_NAME="release/v${{ env.VERSION }}"
echo "BRANCH_NAME=$BRANCH_NAME" >> $GITHUB_ENV
# Create branch via API
BASE_SHA=$(git rev-parse HEAD)
gh api \
--method POST \
/repos/$GITHUB_REPOSITORY/git/refs \
-f ref="refs/heads/$BRANCH_NAME" \
-f sha="$BASE_SHA"
# Get current SHA values of files
echo "Getting SHA for pyproject.toml"
PYPROJECT_SHA=$(gh api /repos/$GITHUB_REPOSITORY/contents/pyproject.toml --jq '.sha')
echo "Getting SHA for _version.py"
VERSION_SHA=$(gh api /repos/$GITHUB_REPOSITORY/contents/src/claude_agent_sdk/_version.py --jq '.sha')
# Configure git
git config --local user.email "github-actions[bot]@users.noreply.github.com"
git config --local user.name "github-actions[bot]"
# Create and switch to new branch
git checkout -b "$BRANCH_NAME"
# Commit version changes
git add pyproject.toml src/claude_agent_sdk/_version.py
git commit -m "chore: release v${{ env.VERSION }}"
- name: Update changelog with Claude
continue-on-error: true
uses: anthropics/claude-code-action@v1
with:
prompt: "/generate-changelog new version: ${{ env.VERSION }}, old version: ${{ steps.previous_tag.outputs.previous_tag }}"
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}
claude_args: |
--model claude-opus-4-5
--allowedTools 'Bash(git add:*),Bash(git commit:*),Edit'
- name: Push branch and create PR
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Push the branch with all commits
git push origin "${{ env.BRANCH_NAME }}"
# Commit pyproject.toml via GitHub API (this creates signed commits)
message="chore: bump version to ${{ env.VERSION }}"
base64 -i pyproject.toml > pyproject.toml.b64
gh api \
--method PUT \
/repos/$GITHUB_REPOSITORY/contents/pyproject.toml \
-f message="$message" \
-F content=@pyproject.toml.b64 \
-f sha="$PYPROJECT_SHA" \
-f branch="$BRANCH_NAME"
# Commit _version.py via GitHub API
base64 -i src/claude_agent_sdk/_version.py > version.py.b64
gh api \
--method PUT \
/repos/$GITHUB_REPOSITORY/contents/src/claude_agent_sdk/_version.py \
-f message="$message" \
-F content=@version.py.b64 \
-f sha="$VERSION_SHA" \
-f branch="$BRANCH_NAME"
# Create PR using GitHub CLI
PR_BODY="This PR updates the version to ${{ env.VERSION }} after publishing to PyPI.
## Changes
- Updated version in \`pyproject.toml\` to ${{ env.VERSION }}
- Updated version in \`src/claude_agent_sdk/_version.py\` to ${{ env.VERSION }}
- Updated \`CHANGELOG.md\` with release notes
- Updated version in \`pyproject.toml\`
- Updated version in \`src/claude_agent_sdk/_version.py\`
## Release Information
- Published to PyPI: https://pypi.org/project/claude-agent-sdk/${{ env.VERSION }}/
- Bundled CLI version: ${{ steps.cli_version.outputs.cli_version }}
- Install with: \`pip install claude-agent-sdk==${{ env.VERSION }}\`
🤖 Generated by GitHub Actions"
PR_URL=$(gh pr create \
--title "chore: release v${{ env.VERSION }}" \
--title "chore: bump version to ${{ env.VERSION }}" \
--body "$PR_BODY" \
--base main \
--head "${{ env.BRANCH_NAME }}")
echo "PR created: $PR_URL"
--head "$BRANCH_NAME")
echo "PR created: $PR_URL"

View file

@ -1,36 +0,0 @@
name: Post new issues to Slack
on:
issues:
types: [opened]
jobs:
notify:
runs-on: ubuntu-latest
steps:
- name: Post to Slack
uses: slackapi/slack-github-action@91efab103c0de0a537f72a35f6b8cda0ee76bf0a # 2.1.1
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
{
"channel": "C09HY5E0K60",
"text": "New issue opened in ${{ github.repository }}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*New Issue:* <${{ github.event.issue.html_url }}|#${{ github.event.issue.number }} ${{ github.event.issue.title }}>"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Author:* ${{ github.event.issue.user.login }}"
}
}
]
}

View file

@ -8,10 +8,9 @@ on:
jobs:
test:
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
@ -38,11 +37,10 @@ jobs:
fail_ci_if_error: false
test-e2e:
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
needs: test # Run after unit tests pass
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
@ -53,20 +51,11 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install Claude Code (Linux/macOS)
if: runner.os == 'Linux' || runner.os == 'macOS'
- name: Install Claude Code
run: |
curl -fsSL https://claude.ai/install.sh | bash
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Install Claude Code (Windows)
if: runner.os == 'Windows'
run: |
irm https://claude.ai/install.ps1 | iex
$claudePath = "$env:USERPROFILE\.local\bin"
echo "$claudePath" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
shell: pwsh
- name: Verify Claude Code installation
run: claude -v
@ -81,30 +70,12 @@ jobs:
run: |
python -m pytest e2e-tests/ -v -m e2e
test-e2e-docker:
runs-on: ubuntu-latest
needs: test # Run after unit tests pass
# Run e2e tests in Docker to catch container-specific issues like #406
steps:
- uses: actions/checkout@v4
- name: Build Docker test image
run: docker build -f Dockerfile.test -t claude-sdk-test .
- name: Run e2e tests in Docker
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: |
docker run --rm -e ANTHROPIC_API_KEY \
claude-sdk-test python -m pytest e2e-tests/ -v -m e2e
test-examples:
runs-on: ubuntu-latest
needs: test-e2e # Run after e2e tests
strategy:
matrix:
python-version: ["3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4
@ -114,20 +85,11 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install Claude Code (Linux)
if: runner.os == 'Linux'
- name: Install Claude Code
run: |
curl -fsSL https://claude.ai/install.sh | bash
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Install Claude Code (Windows)
if: runner.os == 'Windows'
run: |
irm https://claude.ai/install.ps1 | iex
$claudePath = "$env:USERPROFILE\.local\bin"
echo "$claudePath" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
shell: pwsh
- name: Verify Claude Code installation
run: claude -v
@ -136,34 +98,9 @@ jobs:
python -m pip install --upgrade pip
pip install -e .
- name: Run example scripts (Linux)
if: runner.os == 'Linux'
- name: Run example scripts
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: |
python examples/quick_start.py
timeout 120 python examples/streaming_mode.py all
timeout 120 python examples/hooks.py PreToolUse
timeout 120 python examples/hooks.py DecisionFields
- name: Run example scripts (Windows)
if: runner.os == 'Windows'
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: |
python examples/quick_start.py
$job = Start-Job { python examples/streaming_mode.py all }
Wait-Job $job -Timeout 120 | Out-Null
Stop-Job $job
Receive-Job $job
$job = Start-Job { python examples/hooks.py PreToolUse }
Wait-Job $job -Timeout 120 | Out-Null
Stop-Job $job
Receive-Job $job
$job = Start-Job { python examples/hooks.py DecisionFields }
Wait-Job $job -Timeout 120 | Out-Null
Stop-Job $job
Receive-Job $job
shell: pwsh

1
.gitignore vendored
View file

@ -26,7 +26,6 @@ venv/
ENV/
env/
.venv
uv.lock
# IDEs
.vscode/

View file

@ -1,157 +1,5 @@
# Changelog
## 0.1.18
### Internal/Other Changes
- **Docker-based test infrastructure**: Added Docker support for running e2e tests in containerized environments, helping catch Docker-specific issues (#424)
- Updated bundled Claude CLI to version 2.0.72
## 0.1.17
### New Features
- **UserMessage UUID field**: Added `uuid` field to `UserMessage` response type, making it easier to use the `rewind_files()` method by providing direct access to message identifiers needed for file checkpointing (#418)
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.70
## 0.1.16
### Bug Fixes
- **Rate limit detection**: Fixed parsing of the `error` field in `AssistantMessage`, enabling applications to detect and handle API errors like rate limits. Previously, the `error` field was defined but never populated from CLI responses (#405)
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.68
## 0.1.15
### New Features
- **File checkpointing and rewind**: Added `enable_file_checkpointing` option to `ClaudeAgentOptions` and `rewind_files(user_message_id)` method to `ClaudeSDKClient` and `Query`. This enables reverting file changes made during a session back to a specific checkpoint, useful for exploring different approaches or recovering from unwanted modifications (#395)
### Documentation
- Added license and terms section to README (#399)
## 0.1.14
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.62
## 0.1.13
### Bug Fixes
- **Faster error handling**: CLI errors (e.g., invalid session ID) now propagate to pending requests immediately instead of waiting for the 60-second timeout (#388)
- **Pydantic 2.12+ compatibility**: Fixed `PydanticUserError` caused by `McpServer` type only being imported under `TYPE_CHECKING` (#385)
- **Concurrent subagent writes**: Added write lock to prevent `BusyResourceError` when multiple subagents invoke MCP tools in parallel (#391)
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.59
## 0.1.12
### New Features
- **Tools option**: Added `tools` option to `ClaudeAgentOptions` for controlling the base set of available tools, matching the TypeScript SDK functionality. Supports three modes:
- Array of tool names to specify which tools should be available (e.g., `["Read", "Edit", "Bash"]`)
- Empty array `[]` to disable all built-in tools
- Preset object `{"type": "preset", "preset": "claude_code"}` to use the default Claude Code toolset
- **SDK beta support**: Added `betas` option to `ClaudeAgentOptions` for enabling Anthropic API beta features. Currently supports `"context-1m-2025-08-07"` for extended context window
## 0.1.11
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.57
## 0.1.10
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.53
## 0.1.9
### Internal/Other Changes
- Updated bundled Claude CLI to version 2.0.49
## 0.1.8
### Features
- Claude Code is now included by default in the package, removing the requirement to install it separately. If you do wish to use a separately installed build, use the `cli_path` field in `Options`.
## 0.1.7
### Features
- **Structured outputs support**: Agents can now return validated JSON matching your schema. See https://docs.claude.com/en/docs/agent-sdk/structured-outputs. (#340)
- **Fallback model handling**: Added automatic fallback model handling for improved reliability and parity with the TypeScript SDK. When the primary model is unavailable, the SDK will automatically use a fallback model (#317)
- **Local Claude CLI support**: Added support for using a locally installed Claude CLI from `~/.claude/local/claude`, enabling development and testing with custom Claude CLI builds (#302)
## 0.1.6
### Features
- **Max budget control**: Added `max_budget_usd` option to set a maximum spending limit in USD for SDK sessions. When the budget is exceeded, the session will automatically terminate, helping prevent unexpected costs (#293)
- **Extended thinking configuration**: Added `max_thinking_tokens` option to control the maximum number of tokens allocated for Claude's internal reasoning process. This allows fine-tuning of the balance between response quality and token usage (#298)
### Bug Fixes
- **System prompt defaults**: Fixed issue where a default system prompt was being used when none was specified. The SDK now correctly uses an empty system prompt by default, giving users full control over agent behavior (#290)
## 0.1.5
### Features
- **Plugin support**: Added the ability to load Claude Code plugins programmatically through the SDK. Plugins can be specified using the new `plugins` field in `ClaudeAgentOptions` with a `SdkPluginConfig` type that supports loading local plugins by path. This enables SDK applications to extend functionality with custom commands and capabilities defined in plugin directories
## 0.1.4
### Features
- **Skip version check**: Added `CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK` environment variable to allow users to disable the Claude Code version check. Set this environment variable to skip the minimum version validation when the SDK connects to Claude Code. (Only recommended if you already have Claude Code 2.0.0 or higher installed, otherwise some functionality may break)
- SDK MCP server tool calls can now return image content blocks
## 0.1.3
### Features
- **Strongly-typed hook inputs**: Added typed hook input structures (`PreToolUseHookInput`, `PostToolUseHookInput`, `UserPromptSubmitHookInput`, etc.) using TypedDict for better IDE autocomplete and type safety. Hook callbacks now receive fully typed input parameters
### Bug Fixes
- **Hook output field conversion**: Fixed bug where Python-safe field names (`async_`, `continue_`) in hook outputs were not being converted to CLI format (`async`, `continue`). This caused hook control fields to be silently ignored, preventing proper hook behavior. The SDK now automatically converts field names when communicating with the CLI
### Internal/Other Changes
- **CI/CD**: Re-enabled Windows testing in the end-to-end test workflow. Windows CI had been temporarily disabled but is now fully operational across all test suites
## 0.1.2
### Bug Fixes
- **Hook output fields**: Added missing hook output fields to match the TypeScript SDK, including `reason`, `continue_`, `suppressOutput`, and `stopReason`. The `decision` field now properly supports both "approve" and "block" values. Added `AsyncHookJSONOutput` type for deferred hook execution and proper typing for `hookSpecificOutput` with discriminated unions
## 0.1.1
### Features
- **Minimum Claude Code version check**: Added version validation to ensure Claude Code 2.0.0+ is installed. The SDK will display a warning if an older version is detected, helping prevent compatibility issues
- **Updated PermissionResult types**: Aligned permission result types with the latest control protocol for better type safety and compatibility
### Improvements
- **Model references**: Updated all examples and tests to use the simplified `claude-sonnet-4-5` model identifier instead of dated version strings
## 0.1.0
Introducing the Claude Agent SDK! The Claude Code SDK has been renamed to better reflect its capabilities for building AI agents across all domains, not just coding.
@ -159,9 +7,7 @@ Introducing the Claude Agent SDK! The Claude Code SDK has been renamed to better
### Breaking Changes
#### Type Name Changes
- **ClaudeCodeOptions renamed to ClaudeAgentOptions**: The options type has been renamed to match the new SDK branding. Update all imports and type references:
```python
# Before
from claude_agent_sdk import query, ClaudeCodeOptions
@ -173,7 +19,6 @@ Introducing the Claude Agent SDK! The Claude Code SDK has been renamed to better
```
#### System Prompt Changes
- **Merged prompt options**: The `custom_system_prompt` and `append_system_prompt` fields have been merged into a single `system_prompt` field for simpler configuration
- **No default system prompt**: The Claude Code system prompt is no longer included by default, giving you full control over agent behavior. To use the Claude Code system prompt, explicitly set:
```python
@ -181,7 +26,6 @@ Introducing the Claude Agent SDK! The Claude Code SDK has been renamed to better
```
#### Settings Isolation
- **No filesystem settings by default**: Settings files (`settings.json`, `CLAUDE.md`), slash commands, and subagents are no longer loaded automatically. This ensures SDK applications have predictable behavior independent of local filesystem configurations
- **Explicit settings control**: Use the new `setting_sources` field to specify which settings locations to load: `["user", "project", "local"]`
@ -236,3 +80,4 @@ For full migration instructions, see our [migration guide](https://docs.claude.c
- Fix multi-line buffering issue
- Rename cost_usd to total_cost_usd in API responses
- Fix optional cost fields handling

View file

@ -1,29 +0,0 @@
# Dockerfile for running SDK tests in a containerized environment
# This helps catch Docker-specific issues like #406
FROM python:3.12-slim
# Install dependencies for Claude CLI and git (needed for some tests)
RUN apt-get update && apt-get install -y \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# Install Claude Code CLI
RUN curl -fsSL https://claude.ai/install.sh | bash
ENV PATH="/root/.local/bin:$PATH"
# Set up working directory
WORKDIR /app
# Copy the SDK source
COPY . .
# Install SDK with dev dependencies
RUN pip install -e ".[dev]"
# Verify CLI installation
RUN claude -v
# Default: run unit tests
CMD ["python", "-m", "pytest", "tests/", "-v"]

View file

@ -9,13 +9,9 @@ pip install claude-agent-sdk
```
**Prerequisites:**
- Python 3.10+
**Note:** The Claude Code CLI is automatically bundled with the package - no separate installation required! The SDK will use the bundled CLI by default. If you prefer to use a system-wide installation or a specific version, you can:
- Install Claude Code separately: `curl -fsSL https://claude.ai/install.sh | bash`
- Specify a custom path: `ClaudeAgentOptions(cli_path="/path/to/claude")`
- Node.js
- Claude Code 2.0.0+: `npm install -g @anthropic-ai/claude-code`
## Quick Start
@ -183,7 +179,7 @@ options = ClaudeAgentOptions(
### Hooks
A **hook** is a Python function that the Claude Code _application_ (_not_ Claude) invokes at specific points of the Claude agent loop. Hooks can provide deterministic processing and automated feedback for Claude. Read more in [Claude Code Hooks Reference](https://docs.anthropic.com/en/docs/claude-code/hooks).
A **hook** is a Python function that the Claude Code *application* (*not* Claude) invokes at specific points of the Claude agent loop. Hooks can provide deterministic processing and automated feedback for Claude. Read more in [Claude Code Hooks Reference](https://docs.anthropic.com/en/docs/claude-code/hooks).
For more examples, see examples/hooks.py.
@ -233,10 +229,10 @@ async with ClaudeSDKClient(options=options) as client:
print(msg)
```
## Types
See [src/claude_agent_sdk/types.py](src/claude_agent_sdk/types.py) for complete type definitions:
- `ClaudeAgentOptions` - Configuration options
- `AssistantMessage`, `UserMessage`, `SystemMessage`, `ResultMessage` - Message types
- `TextBlock`, `ToolUseBlock`, `ToolResultBlock` - Content blocks
@ -263,7 +259,7 @@ except CLIJSONDecodeError as e:
print(f"Failed to parse response: {e}")
```
See [src/claude_agent_sdk/\_errors.py](src/claude_agent_sdk/_errors.py) for all error types.
See [src/claude_agent_sdk/_errors.py](src/claude_agent_sdk/_errors.py) for all error types.
## Available Tools
@ -284,73 +280,6 @@ If you're upgrading from the Claude Code SDK (versions < 0.1.0), please see the
- Settings isolation and explicit control
- New programmatic subagents and session forking features
## Development
## License
If you're contributing to this project, run the initial setup script to install git hooks:
```bash
./scripts/initial-setup.sh
```
This installs a pre-push hook that runs lint checks before pushing, matching the CI workflow. To skip the hook temporarily, use `git push --no-verify`.
### Building Wheels Locally
To build wheels with the bundled Claude Code CLI:
```bash
# Install build dependencies
pip install build twine
# Build wheel with bundled CLI
python scripts/build_wheel.py
# Build with specific version
python scripts/build_wheel.py --version 0.1.4
# Build with specific CLI version
python scripts/build_wheel.py --cli-version 2.0.0
# Clean bundled CLI after building
python scripts/build_wheel.py --clean
# Skip CLI download (use existing)
python scripts/build_wheel.py --skip-download
```
The build script:
1. Downloads Claude Code CLI for your platform
2. Bundles it in the wheel
3. Builds both wheel and source distribution
4. Checks the package with twine
See `python scripts/build_wheel.py --help` for all options.
### Release Workflow
The package is published to PyPI via the GitHub Actions workflow in `.github/workflows/publish.yml`. To create a new release:
1. **Trigger the workflow** manually from the Actions tab with two inputs:
- `version`: The package version to publish (e.g., `0.1.5`)
- `claude_code_version`: The Claude Code CLI version to bundle (e.g., `2.0.0` or `latest`)
2. **The workflow will**:
- Build platform-specific wheels for macOS, Linux, and Windows
- Bundle the specified Claude Code CLI version in each wheel
- Build a source distribution
- Publish all artifacts to PyPI
- Create a release branch with version updates
- Open a PR to main with:
- Updated `pyproject.toml` version
- Updated `src/claude_agent_sdk/_version.py`
- Updated `src/claude_agent_sdk/_cli_version.py` with bundled CLI version
- Auto-generated `CHANGELOG.md` entry
3. **Review and merge** the release PR to update main with the new version information
The workflow tracks both the package version and the bundled CLI version separately, allowing you to release a new package version with an updated CLI without code changes.
## License and terms
Use of this SDK is governed by Anthropic's [Commercial Terms of Service](https://www.anthropic.com/legal/commercial-terms), including when you use it to power products and services that you make available to your own customers and end users, except to the extent a specific component or dependency is covered by a different license as indicated in that component's LICENSE file.
MIT

View file

@ -1,7 +1,5 @@
"""End-to-end tests for agents and setting sources with real Claude API calls."""
import asyncio
import sys
import tempfile
from pathlib import Path
@ -38,88 +36,15 @@ async def test_agent_definition():
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
agents = message.data.get("agents", [])
assert isinstance(agents, list), (
f"agents should be a list of strings, got: {type(agents)}"
)
assert "test-agent" in agents, (
f"test-agent should be available, got: {agents}"
)
assert isinstance(
agents, list
), f"agents should be a list of strings, got: {type(agents)}"
assert (
"test-agent" in agents
), f"test-agent should be available, got: {agents}"
break
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_filesystem_agent_loading():
"""Test that filesystem-based agents load via setting_sources and produce full response.
This is the core test for issue #406. It verifies that when using
setting_sources=["project"] with a .claude/agents/ directory containing
agent definitions, the SDK:
1. Loads the agents (they appear in init message)
2. Produces a full response with AssistantMessage
3. Completes with a ResultMessage
The bug in #406 causes the iterator to complete after only the
init SystemMessage, never yielding AssistantMessage or ResultMessage.
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Create a temporary project with a filesystem agent
project_dir = Path(tmpdir)
agents_dir = project_dir / ".claude" / "agents"
agents_dir.mkdir(parents=True)
# Create a test agent file
agent_file = agents_dir / "fs-test-agent.md"
agent_file.write_text(
"""---
name: fs-test-agent
description: A filesystem test agent for SDK testing
tools: Read
---
# Filesystem Test Agent
You are a simple test agent. When asked a question, provide a brief, helpful answer.
"""
)
options = ClaudeAgentOptions(
setting_sources=["project"],
cwd=project_dir,
max_turns=1,
)
messages = []
async with ClaudeSDKClient(options=options) as client:
await client.query("Say hello in exactly 3 words")
async for msg in client.receive_response():
messages.append(msg)
# Must have at least init, assistant, result
message_types = [type(m).__name__ for m in messages]
assert "SystemMessage" in message_types, "Missing SystemMessage (init)"
assert "AssistantMessage" in message_types, (
f"Missing AssistantMessage - got only: {message_types}. "
"This may indicate issue #406 (silent failure with filesystem agents)."
)
assert "ResultMessage" in message_types, "Missing ResultMessage"
# Find the init message and check for the filesystem agent
for msg in messages:
if isinstance(msg, SystemMessage) and msg.subtype == "init":
agents = msg.data.get("agents", [])
# Agents are returned as strings (just names)
assert "fs-test-agent" in agents, (
f"fs-test-agent not loaded from filesystem. Found: {agents}"
)
break
# On Windows, wait for file handles to be released before cleanup
if sys.platform == "win32":
await asyncio.sleep(0.5)
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_setting_sources_default():
@ -147,18 +72,14 @@ async def test_setting_sources_default():
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
output_style = message.data.get("output_style")
assert output_style != "local-test-style", (
f"outputStyle should NOT be from local settings (default is no settings), got: {output_style}"
)
assert output_style == "default", (
f"outputStyle should be 'default', got: {output_style}"
)
assert (
output_style != "local-test-style"
), f"outputStyle should NOT be from local settings (default is no settings), got: {output_style}"
assert (
output_style == "default"
), f"outputStyle should be 'default', got: {output_style}"
break
# On Windows, wait for file handles to be released before cleanup
if sys.platform == "win32":
await asyncio.sleep(0.5)
@pytest.mark.e2e
@pytest.mark.asyncio
@ -194,15 +115,11 @@ This is a test command.
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
commands = message.data.get("slash_commands", [])
assert "testcmd" not in commands, (
f"testcmd should NOT be available with user-only sources, got: {commands}"
)
assert (
"testcmd" not in commands
), f"testcmd should NOT be available with user-only sources, got: {commands}"
break
# On Windows, wait for file handles to be released before cleanup
if sys.platform == "win32":
await asyncio.sleep(0.5)
@pytest.mark.e2e
@pytest.mark.asyncio
@ -232,11 +149,7 @@ async def test_setting_sources_project_included():
async for message in client.receive_response():
if isinstance(message, SystemMessage) and message.subtype == "init":
output_style = message.data.get("output_style")
assert output_style == "local-test-style", (
f"outputStyle should be from local settings, got: {output_style}"
)
break
# On Windows, wait for file handles to be released before cleanup
if sys.platform == "win32":
await asyncio.sleep(0.5)
assert (
output_style == "local-test-style"
), f"outputStyle should be from local settings, got: {output_style}"
break

View file

@ -1,150 +0,0 @@
"""End-to-end tests for hook callbacks with real Claude API calls."""
import pytest
from claude_agent_sdk import (
ClaudeAgentOptions,
ClaudeSDKClient,
HookContext,
HookInput,
HookJSONOutput,
HookMatcher,
)
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_hook_with_permission_decision_and_reason():
"""Test that hooks with permissionDecision and reason fields work end-to-end."""
hook_invocations = []
async def test_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Hook that uses permissionDecision and reason fields."""
tool_name = input_data.get("tool_name", "")
print(f"Hook called for tool: {tool_name}")
hook_invocations.append(tool_name)
# Block Bash commands for this test
if tool_name == "Bash":
return {
"reason": "Bash commands are blocked in this test for safety",
"systemMessage": "⚠️ Command blocked by hook",
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Security policy: Bash blocked",
},
}
return {
"reason": "Tool approved by security review",
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"permissionDecisionReason": "Tool passed security checks",
},
}
options = ClaudeAgentOptions(
allowed_tools=["Bash", "Write"],
hooks={
"PreToolUse": [
HookMatcher(matcher="Bash", hooks=[test_hook]),
],
},
)
async with ClaudeSDKClient(options=options) as client:
await client.query("Run this bash command: echo 'hello'")
async for message in client.receive_response():
print(f"Got message: {message}")
print(f"Hook invocations: {hook_invocations}")
# Verify hook was called
assert "Bash" in hook_invocations, f"Hook should have been invoked for Bash tool, got: {hook_invocations}"
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_hook_with_continue_and_stop_reason():
"""Test that hooks with continue_=False and stopReason fields work end-to-end."""
hook_invocations = []
async def post_tool_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""PostToolUse hook that stops execution with stopReason."""
tool_name = input_data.get("tool_name", "")
hook_invocations.append(tool_name)
# Actually test continue_=False and stopReason fields
return {
"continue_": False,
"stopReason": "Execution halted by test hook for validation",
"reason": "Testing continue and stopReason fields",
"systemMessage": "🛑 Test hook stopped execution",
}
options = ClaudeAgentOptions(
allowed_tools=["Bash"],
hooks={
"PostToolUse": [
HookMatcher(matcher="Bash", hooks=[post_tool_hook]),
],
},
)
async with ClaudeSDKClient(options=options) as client:
await client.query("Run: echo 'test message'")
async for message in client.receive_response():
print(f"Got message: {message}")
print(f"Hook invocations: {hook_invocations}")
# Verify hook was called
assert "Bash" in hook_invocations, f"PostToolUse hook should have been invoked, got: {hook_invocations}"
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_hook_with_additional_context():
"""Test that hooks with hookSpecificOutput work end-to-end."""
hook_invocations = []
async def context_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Hook that provides additional context."""
hook_invocations.append("context_added")
return {
"systemMessage": "Additional context provided by hook",
"reason": "Hook providing monitoring feedback",
"suppressOutput": False,
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"additionalContext": "The command executed successfully with hook monitoring",
},
}
options = ClaudeAgentOptions(
allowed_tools=["Bash"],
hooks={
"PostToolUse": [
HookMatcher(matcher="Bash", hooks=[context_hook]),
],
},
)
async with ClaudeSDKClient(options=options) as client:
await client.query("Run: echo 'testing hooks'")
async for message in client.receive_response():
print(f"Got message: {message}")
print(f"Hook invocations: {hook_invocations}")
# Verify hook was called
assert "context_added" in hook_invocations, "Hook with hookSpecificOutput should have been invoked"

View file

@ -1,204 +0,0 @@
"""End-to-end tests for structured output with real Claude API calls.
These tests verify that the output_schema feature works correctly by making
actual API calls to Claude with JSON Schema validation.
"""
import tempfile
import pytest
from claude_agent_sdk import (
ClaudeAgentOptions,
ResultMessage,
query,
)
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_simple_structured_output():
"""Test structured output with file counting requiring tool use."""
# Define schema for file analysis
schema = {
"type": "object",
"properties": {
"file_count": {"type": "number"},
"has_tests": {"type": "boolean"},
"test_file_count": {"type": "number"},
},
"required": ["file_count", "has_tests"],
}
options = ClaudeAgentOptions(
output_format={"type": "json_schema", "schema": schema},
permission_mode="acceptEdits",
cwd=".", # Use current directory
)
# Agent must use Glob/Bash to count files
result_message = None
async for message in query(
prompt="Count how many Python files are in src/claude_agent_sdk/ and check if there are any test files. Use tools to explore the filesystem.",
options=options,
):
if isinstance(message, ResultMessage):
result_message = message
# Verify result
assert result_message is not None, "No result message received"
assert not result_message.is_error, f"Query failed: {result_message.result}"
assert result_message.subtype == "success"
# Verify structured output is present and valid
assert result_message.structured_output is not None, "No structured output in result"
assert "file_count" in result_message.structured_output
assert "has_tests" in result_message.structured_output
assert isinstance(result_message.structured_output["file_count"], (int, float))
assert isinstance(result_message.structured_output["has_tests"], bool)
# Should find Python files in src/
assert result_message.structured_output["file_count"] > 0
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_nested_structured_output():
"""Test structured output with nested objects and arrays."""
# Define a schema with nested structure
schema = {
"type": "object",
"properties": {
"analysis": {
"type": "object",
"properties": {
"word_count": {"type": "number"},
"character_count": {"type": "number"},
},
"required": ["word_count", "character_count"],
},
"words": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["analysis", "words"],
}
options = ClaudeAgentOptions(
output_format={"type": "json_schema", "schema": schema},
permission_mode="acceptEdits",
)
result_message = None
async for message in query(
prompt="Analyze this text: 'Hello world'. Provide word count, character count, and list of words.",
options=options,
):
if isinstance(message, ResultMessage):
result_message = message
# Verify result
assert result_message is not None
assert not result_message.is_error
assert result_message.structured_output is not None
# Check nested structure
output = result_message.structured_output
assert "analysis" in output
assert "words" in output
assert output["analysis"]["word_count"] == 2
assert output["analysis"]["character_count"] == 11 # "Hello world"
assert len(output["words"]) == 2
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_structured_output_with_enum():
"""Test structured output with enum constraints requiring code analysis."""
schema = {
"type": "object",
"properties": {
"has_tests": {"type": "boolean"},
"test_framework": {
"type": "string",
"enum": ["pytest", "unittest", "nose", "unknown"],
},
"test_count": {"type": "number"},
},
"required": ["has_tests", "test_framework"],
}
options = ClaudeAgentOptions(
output_format={"type": "json_schema", "schema": schema},
permission_mode="acceptEdits",
cwd=".",
)
result_message = None
async for message in query(
prompt="Search for test files in the tests/ directory. Determine which test framework is being used (pytest/unittest/nose) and count how many test files exist. Use Grep to search for framework imports.",
options=options,
):
if isinstance(message, ResultMessage):
result_message = message
# Verify result
assert result_message is not None
assert not result_message.is_error
assert result_message.structured_output is not None
# Check enum values are valid
output = result_message.structured_output
assert output["test_framework"] in ["pytest", "unittest", "nose", "unknown"]
assert isinstance(output["has_tests"], bool)
# This repo uses pytest
assert output["has_tests"] is True
assert output["test_framework"] == "pytest"
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_structured_output_with_tools():
"""Test structured output when agent uses tools."""
# Schema for file analysis
schema = {
"type": "object",
"properties": {
"file_count": {"type": "number"},
"has_readme": {"type": "boolean"},
},
"required": ["file_count", "has_readme"],
}
options = ClaudeAgentOptions(
output_format={"type": "json_schema", "schema": schema},
permission_mode="acceptEdits",
cwd=tempfile.gettempdir(), # Cross-platform temp directory
)
result_message = None
async for message in query(
prompt="Count how many files are in the current directory and check if there's a README file. Use tools as needed.",
options=options,
):
if isinstance(message, ResultMessage):
result_message = message
# Verify result
assert result_message is not None
assert not result_message.is_error
assert result_message.structured_output is not None
# Check structure
output = result_message.structured_output
assert "file_count" in output
assert "has_readme" in output
assert isinstance(output["file_count"], (int, float))
assert isinstance(output["has_readme"], bool)
assert output["file_count"] >= 0 # Should be non-negative

View file

@ -1,107 +0,0 @@
#!/usr/bin/env python3
"""Example of loading filesystem-based agents via setting_sources.
This example demonstrates how to load agents defined in .claude/agents/ files
using the setting_sources option. This is different from inline AgentDefinition
objects - these agents are loaded from markdown files on disk.
This example tests the scenario from issue #406 where filesystem-based agents
loaded via setting_sources=["project"] may silently fail in certain environments.
Usage:
./examples/filesystem_agents.py
"""
import asyncio
from pathlib import Path
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
SystemMessage,
TextBlock,
)
def extract_agents(msg: SystemMessage) -> list[str]:
"""Extract agent names from system message init data."""
if msg.subtype == "init":
agents = msg.data.get("agents", [])
# Agents can be either strings or dicts with a 'name' field
result = []
for a in agents:
if isinstance(a, str):
result.append(a)
elif isinstance(a, dict):
result.append(a.get("name", ""))
return result
return []
async def main():
"""Test loading filesystem-based agents."""
print("=== Filesystem Agents Example ===")
print("Testing: setting_sources=['project'] with .claude/agents/test-agent.md")
print()
# Use the SDK repo directory which has .claude/agents/test-agent.md
sdk_dir = Path(__file__).parent.parent
options = ClaudeAgentOptions(
setting_sources=["project"],
cwd=sdk_dir,
)
message_types: list[str] = []
agents_found: list[str] = []
async with ClaudeSDKClient(options=options) as client:
await client.query("Say hello in exactly 3 words")
async for msg in client.receive_response():
message_types.append(type(msg).__name__)
if isinstance(msg, SystemMessage) and msg.subtype == "init":
agents_found = extract_agents(msg)
print(f"Init message received. Agents loaded: {agents_found}")
elif isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Assistant: {block.text}")
elif isinstance(msg, ResultMessage):
print(
f"Result: subtype={msg.subtype}, cost=${msg.total_cost_usd or 0:.4f}"
)
print()
print("=== Summary ===")
print(f"Message types received: {message_types}")
print(f"Total messages: {len(message_types)}")
# Validate the results
has_init = "SystemMessage" in message_types
has_assistant = "AssistantMessage" in message_types
has_result = "ResultMessage" in message_types
has_test_agent = "test-agent" in agents_found
print()
if has_init and has_assistant and has_result:
print("SUCCESS: Received full response (init, assistant, result)")
else:
print("FAILURE: Did not receive full response")
print(f" - Init: {has_init}")
print(f" - Assistant: {has_assistant}")
print(f" - Result: {has_result}")
if has_test_agent:
print("SUCCESS: test-agent was loaded from filesystem")
else:
print("WARNING: test-agent was NOT loaded (may not exist in .claude/agents/)")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -19,7 +19,6 @@ from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
from claude_agent_sdk.types import (
AssistantMessage,
HookContext,
HookInput,
HookJSONOutput,
HookMatcher,
Message,
@ -44,7 +43,7 @@ def display_message(msg: Message) -> None:
##### Hook callback functions
async def check_bash_command(
input_data: HookInput, tool_use_id: str | None, context: HookContext
input_data: dict[str, Any], tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Prevent certain bash commands from being executed."""
tool_name = input_data["tool_name"]
@ -71,7 +70,7 @@ async def check_bash_command(
async def add_custom_instructions(
input_data: HookInput, tool_use_id: str | None, context: HookContext
input_data: dict[str, Any], tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Add custom instructions when a session starts."""
return {
@ -82,77 +81,6 @@ async def add_custom_instructions(
}
async def review_tool_output(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Review tool output and provide additional context or warnings."""
tool_response = input_data.get("tool_response", "")
# If the tool produced an error, add helpful context
if "error" in str(tool_response).lower():
return {
"systemMessage": "⚠️ The command produced an error",
"reason": "Tool execution failed - consider checking the command syntax",
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"additionalContext": "The command encountered an error. You may want to try a different approach.",
}
}
return {}
async def strict_approval_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Demonstrates using permissionDecision to control tool execution."""
tool_name = input_data.get("tool_name")
tool_input = input_data.get("tool_input", {})
# Block any Write operations to specific files
if tool_name == "Write":
file_path = tool_input.get("file_path", "")
if "important" in file_path.lower():
logger.warning(f"Blocked Write to: {file_path}")
return {
"reason": "Writes to files containing 'important' in the name are not allowed for safety",
"systemMessage": "🚫 Write operation blocked by security policy",
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Security policy blocks writes to important files",
},
}
# Allow everything else explicitly
return {
"reason": "Tool use approved after security review",
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"permissionDecisionReason": "Tool passed security checks",
},
}
async def stop_on_error_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
"""Demonstrates using continue=False to stop execution on certain conditions."""
tool_response = input_data.get("tool_response", "")
# Stop execution if we see a critical error
if "critical" in str(tool_response).lower():
logger.error("Critical error detected - stopping execution")
return {
"continue_": False,
"stopReason": "Critical error detected in tool output - execution halted for safety",
"systemMessage": "🛑 Execution stopped due to critical error",
}
return {"continue_": True}
async def example_pretooluse() -> None:
"""Basic example demonstrating hook protection."""
print("=== PreToolUse Example ===")
@ -215,99 +143,11 @@ async def example_userpromptsubmit() -> None:
print("\n")
async def example_posttooluse() -> None:
"""Demonstrate PostToolUse hook with reason and systemMessage fields."""
print("=== PostToolUse Example ===")
print("This example shows how PostToolUse can provide feedback with reason and systemMessage.\n")
options = ClaudeAgentOptions(
allowed_tools=["Bash"],
hooks={
"PostToolUse": [
HookMatcher(matcher="Bash", hooks=[review_tool_output]),
],
}
)
async with ClaudeSDKClient(options=options) as client:
print("User: Run a command that will produce an error: ls /nonexistent_directory")
await client.query("Run this command: ls /nonexistent_directory")
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_decision_fields() -> None:
"""Demonstrate permissionDecision, reason, and systemMessage fields."""
print("=== Permission Decision Example ===")
print("This example shows how to use permissionDecision='allow'/'deny' with reason and systemMessage.\n")
options = ClaudeAgentOptions(
allowed_tools=["Write", "Bash"],
model="claude-sonnet-4-5-20250929",
hooks={
"PreToolUse": [
HookMatcher(matcher="Write", hooks=[strict_approval_hook]),
],
}
)
async with ClaudeSDKClient(options=options) as client:
# Test 1: Try to write to a file with "important" in the name (should be blocked)
print("Test 1: Trying to write to important_config.txt (should be blocked)...")
print("User: Write 'test' to important_config.txt")
await client.query("Write the text 'test data' to a file called important_config.txt")
async for msg in client.receive_response():
display_message(msg)
print("\n" + "=" * 50 + "\n")
# Test 2: Write to a regular file (should be approved)
print("Test 2: Trying to write to regular_file.txt (should be approved)...")
print("User: Write 'test' to regular_file.txt")
await client.query("Write the text 'test data' to a file called regular_file.txt")
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_continue_control() -> None:
"""Demonstrate continue and stopReason fields for execution control."""
print("=== Continue/Stop Control Example ===")
print("This example shows how to use continue_=False with stopReason to halt execution.\n")
options = ClaudeAgentOptions(
allowed_tools=["Bash"],
hooks={
"PostToolUse": [
HookMatcher(matcher="Bash", hooks=[stop_on_error_hook]),
],
}
)
async with ClaudeSDKClient(options=options) as client:
print("User: Run a command that outputs 'CRITICAL ERROR'")
await client.query("Run this bash command: echo 'CRITICAL ERROR: system failure'")
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def main() -> None:
"""Run all examples or a specific example based on command line argument."""
examples = {
"PreToolUse": example_pretooluse,
"UserPromptSubmit": example_userpromptsubmit,
"PostToolUse": example_posttooluse,
"DecisionFields": example_decision_fields,
"ContinueControl": example_continue_control,
}
if len(sys.argv) < 2:
@ -317,12 +157,6 @@ async def main() -> None:
print(" all - Run all examples")
for name in examples:
print(f" {name}")
print("\nExample descriptions:")
print(" PreToolUse - Block commands using PreToolUse hook")
print(" UserPromptSubmit - Add context at prompt submission")
print(" PostToolUse - Review tool output with reason and systemMessage")
print(" DecisionFields - Use permissionDecision='allow'/'deny' with reason")
print(" ContinueControl - Control execution with continue_ and stopReason")
sys.exit(0)
example_name = sys.argv[1]

View file

@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""Example demonstrating max_budget_usd option for cost control."""
import anyio
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
ResultMessage,
TextBlock,
query,
)
async def without_budget():
"""Example without budget limit."""
print("=== Without Budget Limit ===")
async for message in query(prompt="What is 2 + 2?"):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"Total cost: ${message.total_cost_usd:.4f}")
print(f"Status: {message.subtype}")
print()
async def with_reasonable_budget():
"""Example with budget that won't be exceeded."""
print("=== With Reasonable Budget ($0.10) ===")
options = ClaudeAgentOptions(
max_budget_usd=0.10, # 10 cents - plenty for a simple query
)
async for message in query(prompt="What is 2 + 2?", options=options):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"Total cost: ${message.total_cost_usd:.4f}")
print(f"Status: {message.subtype}")
print()
async def with_tight_budget():
"""Example with very tight budget that will likely be exceeded."""
print("=== With Tight Budget ($0.0001) ===")
options = ClaudeAgentOptions(
max_budget_usd=0.0001, # Very small budget - will be exceeded quickly
)
async for message in query(
prompt="Read the README.md file and summarize it", options=options
):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"Total cost: ${message.total_cost_usd:.4f}")
print(f"Status: {message.subtype}")
# Check if budget was exceeded
if message.subtype == "error_max_budget_usd":
print("⚠️ Budget limit exceeded!")
print(
"Note: The cost may exceed the budget by up to one API call's worth"
)
print()
async def main():
"""Run all examples."""
print("This example demonstrates using max_budget_usd to control API costs.\n")
await without_budget()
await with_reasonable_budget()
await with_tight_budget()
print(
"\nNote: Budget checking happens after each API call completes,\n"
"so the final cost may slightly exceed the specified budget.\n"
)
if __name__ == "__main__":
anyio.run(main)

View file

@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""Example demonstrating how to use plugins with Claude Code SDK.
Plugins allow you to extend Claude Code with custom commands, agents, skills,
and hooks. This example shows how to load a local plugin and verify it's
loaded by checking the system message.
The demo plugin is located in examples/plugins/demo-plugin/ and provides
a custom /greet command.
"""
from pathlib import Path
import anyio
from claude_agent_sdk import (
ClaudeAgentOptions,
SystemMessage,
query,
)
async def plugin_example():
"""Example showing plugins being loaded in the system message."""
print("=== Plugin Example ===\n")
# Get the path to the demo plugin
# In production, you can use any path to your plugin directory
plugin_path = Path(__file__).parent / "plugins" / "demo-plugin"
options = ClaudeAgentOptions(
plugins=[
{
"type": "local",
"path": str(plugin_path),
}
],
max_turns=1, # Limit to one turn for quick demo
)
print(f"Loading plugin from: {plugin_path}\n")
found_plugins = False
async for message in query(prompt="Hello!", options=options):
if isinstance(message, SystemMessage) and message.subtype == "init":
print("System initialized!")
print(f"System message data keys: {list(message.data.keys())}\n")
# Check for plugins in the system message
plugins_data = message.data.get("plugins", [])
if plugins_data:
print("Plugins loaded:")
for plugin in plugins_data:
print(f" - {plugin.get('name')} (path: {plugin.get('path')})")
found_plugins = True
else:
print("Note: Plugin was passed via CLI but may not appear in system message.")
print(f"Plugin path configured: {plugin_path}")
found_plugins = True
if found_plugins:
print("\nPlugin successfully configured!\n")
async def main():
"""Run all plugin examples."""
await plugin_example()
if __name__ == "__main__":
anyio.run(main)

View file

@ -1,8 +0,0 @@
{
"name": "demo-plugin",
"description": "A demo plugin showing how to extend Claude Code with custom commands",
"version": "1.0.0",
"author": {
"name": "Claude Code Team"
}
}

View file

@ -1,5 +0,0 @@
# Greet Command
This is a custom greeting command from the demo plugin.
When the user runs this command, greet them warmly and explain that this message came from a custom plugin loaded via the Python SDK. Tell them that plugins can be used to extend Claude Code with custom commands, agents, skills, and hooks.

View file

@ -1,111 +0,0 @@
#!/usr/bin/env python3
"""Example demonstrating the tools option and verifying tools in system message."""
import anyio
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
ResultMessage,
SystemMessage,
TextBlock,
query,
)
async def tools_array_example():
"""Example with tools as array of specific tool names."""
print("=== Tools Array Example ===")
print("Setting tools=['Read', 'Glob', 'Grep']")
print()
options = ClaudeAgentOptions(
tools=["Read", "Glob", "Grep"],
max_turns=1,
)
async for message in query(
prompt="What tools do you have available? Just list them briefly.",
options=options,
):
if isinstance(message, SystemMessage) and message.subtype == "init":
tools = message.data.get("tools", [])
print(f"Tools from system message: {tools}")
print()
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def tools_empty_array_example():
"""Example with tools as empty array (disables all built-in tools)."""
print("=== Tools Empty Array Example ===")
print("Setting tools=[] (disables all built-in tools)")
print()
options = ClaudeAgentOptions(
tools=[],
max_turns=1,
)
async for message in query(
prompt="What tools do you have available? Just list them briefly.",
options=options,
):
if isinstance(message, SystemMessage) and message.subtype == "init":
tools = message.data.get("tools", [])
print(f"Tools from system message: {tools}")
print()
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def tools_preset_example():
"""Example with tools preset (all default Claude Code tools)."""
print("=== Tools Preset Example ===")
print("Setting tools={'type': 'preset', 'preset': 'claude_code'}")
print()
options = ClaudeAgentOptions(
tools={"type": "preset", "preset": "claude_code"},
max_turns=1,
)
async for message in query(
prompt="What tools do you have available? Just list them briefly.",
options=options,
):
if isinstance(message, SystemMessage) and message.subtype == "init":
tools = message.data.get("tools", [])
print(f"Tools from system message ({len(tools)} tools): {tools[:5]}...")
print()
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage):
if message.total_cost_usd:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def main():
"""Run all examples."""
await tools_array_example()
await tools_empty_array_example()
await tools_preset_example()
if __name__ == "__main__":
anyio.run(main)

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "claude-agent-sdk"
version = "0.1.18"
version = "0.1.1"
description = "Python SDK for Claude Code"
readme = "README.md"
requires-python = ">=3.10"
@ -47,7 +47,6 @@ Issues = "https://github.com/anthropics/claude-agent-sdk-python/issues"
[tool.hatch.build.targets.wheel]
packages = ["src/claude_agent_sdk"]
only-include = ["src/claude_agent_sdk"]
[tool.hatch.build.targets.sdist]
include = [

View file

@ -1,392 +0,0 @@
#!/usr/bin/env python3
"""Build wheel with bundled Claude Code CLI.
This script handles the complete wheel building process:
1. Optionally updates version
2. Downloads Claude Code CLI
3. Builds the wheel
4. Optionally cleans up the bundled CLI
Usage:
python scripts/build_wheel.py # Build with current version
python scripts/build_wheel.py --version 0.1.4 # Build with specific version
python scripts/build_wheel.py --clean # Clean bundled CLI after build
python scripts/build_wheel.py --skip-download # Skip CLI download (use existing)
"""
import argparse
import os
import platform
import re
import shutil
import subprocess
import sys
from pathlib import Path
try:
import twine # noqa: F401
HAS_TWINE = True
except ImportError:
HAS_TWINE = False
def run_command(cmd: list[str], description: str) -> None:
"""Run a command and handle errors."""
print(f"\n{'=' * 60}")
print(f"{description}")
print(f"{'=' * 60}")
print(f"$ {' '.join(cmd)}")
print()
try:
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"Error: {description} failed", file=sys.stderr)
print(e.stdout, file=sys.stderr)
sys.exit(1)
def update_version(version: str) -> None:
"""Update package version."""
script_dir = Path(__file__).parent
update_script = script_dir / "update_version.py"
if not update_script.exists():
print("Warning: update_version.py not found, skipping version update")
return
run_command(
[sys.executable, str(update_script), version],
f"Updating version to {version}",
)
def get_bundled_cli_version() -> str:
"""Get the CLI version that should be bundled from _cli_version.py."""
version_file = Path("src/claude_agent_sdk/_cli_version.py")
if not version_file.exists():
return "latest"
content = version_file.read_text()
match = re.search(r'__cli_version__ = "([^"]+)"', content)
if match:
return match.group(1)
return "latest"
def download_cli(cli_version: str | None = None) -> None:
"""Download Claude Code CLI."""
# Use provided version, or fall back to version from _cli_version.py
if cli_version is None:
cli_version = get_bundled_cli_version()
script_dir = Path(__file__).parent
download_script = script_dir / "download_cli.py"
# Set environment variable for download script
os.environ["CLAUDE_CLI_VERSION"] = cli_version
run_command(
[sys.executable, str(download_script)],
f"Downloading Claude Code CLI ({cli_version})",
)
def clean_dist() -> None:
"""Clean dist directory."""
dist_dir = Path("dist")
if dist_dir.exists():
print(f"\n{'=' * 60}")
print("Cleaning dist directory")
print(f"{'=' * 60}")
shutil.rmtree(dist_dir)
print("Cleaned dist/")
def get_platform_tag() -> str:
"""Get the appropriate platform tag for the current platform.
Uses minimum compatible versions for broad compatibility:
- macOS: 11.0 (Big Sur) as minimum
- Linux: manylinux_2_17 (widely compatible)
- Windows: Standard tags
"""
system = platform.system()
machine = platform.machine().lower()
if system == "Darwin":
# macOS - use minimum version 11.0 (Big Sur) for broad compatibility
if machine == "arm64":
return "macosx_11_0_arm64"
else:
return "macosx_11_0_x86_64"
elif system == "Linux":
# Linux - use manylinux for broad compatibility
if machine in ["x86_64", "amd64"]:
return "manylinux_2_17_x86_64"
elif machine in ["aarch64", "arm64"]:
return "manylinux_2_17_aarch64"
else:
return f"linux_{machine}"
elif system == "Windows":
# Windows
if machine in ["x86_64", "amd64"]:
return "win_amd64"
elif machine == "arm64":
return "win_arm64"
else:
return "win32"
else:
# Unknown platform, use generic
return f"{system.lower()}_{machine}"
def retag_wheel(wheel_path: Path, platform_tag: str) -> Path:
"""Retag a wheel with the correct platform tag using wheel package."""
print(f"\n{'=' * 60}")
print("Retagging wheel as platform-specific")
print(f"{'=' * 60}")
print(f"Old: {wheel_path.name}")
# Use wheel package to properly retag (updates both filename and metadata)
result = subprocess.run(
[
sys.executable,
"-m",
"wheel",
"tags",
"--platform-tag",
platform_tag,
"--remove",
str(wheel_path),
],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(f"Warning: Failed to retag wheel: {result.stderr}")
return wheel_path
# Find the newly tagged wheel
dist_dir = wheel_path.parent
# The wheel package creates a new file with the platform tag
new_wheels = list(dist_dir.glob(f"*{platform_tag}.whl"))
if new_wheels:
new_path = new_wheels[0]
print(f"New: {new_path.name}")
print("Wheel retagged successfully")
# Remove the old wheel
if wheel_path.exists() and wheel_path != new_path:
wheel_path.unlink()
return new_path
else:
print("Warning: Could not find retagged wheel")
return wheel_path
def build_wheel() -> None:
"""Build the wheel."""
run_command(
[sys.executable, "-m", "build", "--wheel"],
"Building wheel",
)
# Check if we have a bundled CLI - if so, retag the wheel as platform-specific
bundled_cli = Path("src/claude_agent_sdk/_bundled/claude")
bundled_cli_exe = Path("src/claude_agent_sdk/_bundled/claude.exe")
if bundled_cli.exists() or bundled_cli_exe.exists():
# Find the built wheel
dist_dir = Path("dist")
wheels = list(dist_dir.glob("*.whl"))
if wheels:
# Get platform tag
platform_tag = get_platform_tag()
# Retag each wheel (should only be one)
for wheel in wheels:
if "-any.whl" in wheel.name:
retag_wheel(wheel, platform_tag)
else:
print("Warning: No wheel found to retag")
else:
print("\nNo bundled CLI found - wheel will be platform-independent")
def build_sdist() -> None:
"""Build the source distribution."""
run_command(
[sys.executable, "-m", "build", "--sdist"],
"Building source distribution",
)
def check_package() -> None:
"""Check package with twine."""
if not HAS_TWINE:
print("\nWarning: twine not installed, skipping package check")
print("Install with: pip install twine")
return
print(f"\n{'=' * 60}")
print("Checking package with twine")
print(f"{'=' * 60}")
print(f"$ {sys.executable} -m twine check dist/*")
print()
try:
result = subprocess.run(
[sys.executable, "-m", "twine", "check", "dist/*"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
print(result.stdout)
if result.returncode != 0:
print("\nWarning: twine check reported issues")
print("Note: 'License-File' warnings are false positives from twine 6.x")
print("PyPI will accept these packages without issues")
else:
print("Package check passed")
except Exception as e:
print(f"Warning: Failed to run twine check: {e}")
def clean_bundled_cli() -> None:
"""Clean bundled CLI."""
bundled_dir = Path("src/claude_agent_sdk/_bundled")
cli_files = list(bundled_dir.glob("claude*"))
if cli_files:
print(f"\n{'=' * 60}")
print("Cleaning bundled CLI")
print(f"{'=' * 60}")
for cli_file in cli_files:
if cli_file.name != ".gitignore":
cli_file.unlink()
print(f"Removed {cli_file}")
else:
print("\nNo bundled CLI to clean")
def list_artifacts() -> None:
"""List built artifacts."""
dist_dir = Path("dist")
if not dist_dir.exists():
return
print(f"\n{'=' * 60}")
print("Built Artifacts")
print(f"{'=' * 60}")
artifacts = sorted(dist_dir.iterdir())
if not artifacts:
print("No artifacts found")
return
for artifact in artifacts:
size_mb = artifact.stat().st_size / (1024 * 1024)
print(f" {artifact.name:<50} {size_mb:>8.2f} MB")
total_size = sum(f.stat().st_size for f in artifacts) / (1024 * 1024)
print(f"\n {'Total:':<50} {total_size:>8.2f} MB")
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Build wheel with bundled Claude Code CLI"
)
parser.add_argument(
"--version",
help="Version to set before building (e.g., 0.1.4)",
)
parser.add_argument(
"--cli-version",
default=None,
help="Claude Code CLI version to download (default: read from _cli_version.py)",
)
parser.add_argument(
"--skip-download",
action="store_true",
help="Skip downloading CLI (use existing bundled CLI)",
)
parser.add_argument(
"--skip-sdist",
action="store_true",
help="Skip building source distribution",
)
parser.add_argument(
"--clean",
action="store_true",
help="Clean bundled CLI after building",
)
parser.add_argument(
"--clean-dist",
action="store_true",
help="Clean dist directory before building",
)
args = parser.parse_args()
print("\n" + "=" * 60)
print("Claude Agent SDK - Wheel Builder")
print("=" * 60)
# Clean dist if requested
if args.clean_dist:
clean_dist()
# Update version if specified
if args.version:
update_version(args.version)
# Download CLI unless skipped
if not args.skip_download:
download_cli(args.cli_version)
else:
print("\nSkipping CLI download (using existing)")
# Build wheel
build_wheel()
# Build sdist unless skipped
if not args.skip_sdist:
build_sdist()
# Check package
check_package()
# Clean bundled CLI if requested
if args.clean:
clean_bundled_cli()
# List artifacts
list_artifacts()
print(f"\n{'=' * 60}")
print("Build complete!")
print(f"{'=' * 60}")
print("\nNext steps:")
print(" 1. Test the wheel: pip install dist/*.whl")
print(" 2. Run tests: python -m pytest tests/")
print(" 3. Publish: twine upload dist/*")
if __name__ == "__main__":
main()

View file

@ -1,157 +0,0 @@
#!/usr/bin/env python3
"""Download Claude Code CLI binary for bundling in wheel.
This script is run during the wheel build process to fetch the Claude Code CLI
binary using the official install script and place it in the package directory.
"""
import os
import platform
import shutil
import subprocess
import sys
from pathlib import Path
def get_cli_version() -> str:
"""Get the CLI version to download from environment or default."""
return os.environ.get("CLAUDE_CLI_VERSION", "latest")
def find_installed_cli() -> Path | None:
"""Find the installed Claude CLI binary."""
system = platform.system()
if system == "Windows":
# Windows installation locations (matches test.yml: $USERPROFILE\.local\bin)
locations = [
Path.home() / ".local" / "bin" / "claude.exe",
Path(os.environ.get("LOCALAPPDATA", "")) / "Claude" / "claude.exe",
]
else:
# Unix installation locations
locations = [
Path.home() / ".local" / "bin" / "claude",
Path("/usr/local/bin/claude"),
Path.home() / "node_modules" / ".bin" / "claude",
]
# Also check PATH
cli_path = shutil.which("claude")
if cli_path:
return Path(cli_path)
for path in locations:
if path.exists() and path.is_file():
return path
return None
def download_cli() -> None:
"""Download Claude Code CLI using the official install script."""
version = get_cli_version()
system = platform.system()
print(f"Downloading Claude Code CLI version: {version}")
# Build install command based on platform
if system == "Windows":
# Use PowerShell installer on Windows
if version == "latest":
install_cmd = [
"powershell",
"-ExecutionPolicy",
"Bypass",
"-Command",
"irm https://claude.ai/install.ps1 | iex",
]
else:
install_cmd = [
"powershell",
"-ExecutionPolicy",
"Bypass",
"-Command",
f"& ([scriptblock]::Create((irm https://claude.ai/install.ps1))) {version}",
]
else:
# Use bash installer on Unix-like systems
if version == "latest":
install_cmd = ["bash", "-c", "curl -fsSL https://claude.ai/install.sh | bash"]
else:
install_cmd = [
"bash",
"-c",
f"curl -fsSL https://claude.ai/install.sh | bash -s {version}",
]
try:
subprocess.run(
install_cmd,
check=True,
capture_output=True,
)
except subprocess.CalledProcessError as e:
print(f"Error downloading CLI: {e}", file=sys.stderr)
print(f"stdout: {e.stdout.decode()}", file=sys.stderr)
print(f"stderr: {e.stderr.decode()}", file=sys.stderr)
sys.exit(1)
def copy_cli_to_bundle() -> None:
"""Copy the installed CLI to the package _bundled directory."""
# Find project root (parent of scripts directory)
script_dir = Path(__file__).parent
project_root = script_dir.parent
bundle_dir = project_root / "src" / "claude_agent_sdk" / "_bundled"
# Ensure bundle directory exists
bundle_dir.mkdir(parents=True, exist_ok=True)
# Find installed CLI
cli_path = find_installed_cli()
if not cli_path:
print("Error: Could not find installed Claude CLI binary", file=sys.stderr)
sys.exit(1)
print(f"Found CLI at: {cli_path}")
# Determine target filename based on platform
system = platform.system()
target_name = "claude.exe" if system == "Windows" else "claude"
target_path = bundle_dir / target_name
# Copy the binary
print(f"Copying CLI to: {target_path}")
shutil.copy2(cli_path, target_path)
# Make it executable (Unix-like systems)
if system != "Windows":
target_path.chmod(0o755)
print(f"Successfully bundled CLI binary: {target_path}")
# Print size info
size_mb = target_path.stat().st_size / (1024 * 1024)
print(f"Binary size: {size_mb:.2f} MB")
def main() -> None:
"""Main entry point."""
print("=" * 60)
print("Claude Code CLI Download Script")
print("=" * 60)
# Download CLI
download_cli()
# Copy to bundle directory
copy_cli_to_bundle()
print("=" * 60)
print("CLI download and bundling complete!")
print("=" * 60)
if __name__ == "__main__":
main()

View file

@ -1,22 +0,0 @@
#!/bin/bash
# Initial setup script for installing git hooks
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
echo "Setting up git hooks..."
# Install pre-push hook
echo "→ Installing pre-push hook..."
cp "$SCRIPT_DIR/pre-push" "$REPO_ROOT/.git/hooks/pre-push"
chmod +x "$REPO_ROOT/.git/hooks/pre-push"
echo "✓ pre-push hook installed"
echo ""
echo "✓ Setup complete!"
echo ""
echo "The pre-push hook will now run lint checks before each push."
echo "To skip the hook temporarily, use: git push --no-verify"

View file

@ -1,30 +0,0 @@
#!/bin/bash
# Pre-push hook to run lint checks (matches .github/workflows/lint.yml)
echo "Running lint checks before push..."
echo ""
# Run ruff check
echo "→ Running ruff check..."
python -m ruff check src/ tests/
if [ $? -ne 0 ]; then
echo ""
echo "❌ ruff check failed. Fix lint issues before pushing."
echo " Run: python -m ruff check src/ tests/ --fix"
exit 1
fi
# Run ruff format check
echo "→ Running ruff format check..."
python -m ruff format --check src/ tests/
if [ $? -ne 0 ]; then
echo ""
echo "❌ ruff format check failed. Fix formatting before pushing."
echo " Run: python -m ruff format src/ tests/"
exit 1
fi
echo ""
echo "✓ All lint checks passed!"
exit 0

View file

@ -1,77 +0,0 @@
#!/bin/bash
# Run SDK tests in a Docker container
# This helps catch Docker-specific issues like #406
#
# Usage:
# ./scripts/test-docker.sh [unit|e2e|all]
#
# Examples:
# ./scripts/test-docker.sh unit # Run unit tests only
# ANTHROPIC_API_KEY=sk-... ./scripts/test-docker.sh e2e # Run e2e tests
# ANTHROPIC_API_KEY=sk-... ./scripts/test-docker.sh all # Run all tests
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
cd "$PROJECT_DIR"
usage() {
echo "Usage: $0 [unit|e2e|all]"
echo ""
echo "Commands:"
echo " unit - Run unit tests only (no API key needed)"
echo " e2e - Run e2e tests (requires ANTHROPIC_API_KEY)"
echo " all - Run both unit and e2e tests"
echo ""
echo "Examples:"
echo " $0 unit"
echo " ANTHROPIC_API_KEY=sk-... $0 e2e"
exit 1
}
echo "Building Docker test image..."
docker build -f Dockerfile.test -t claude-sdk-test .
case "${1:-unit}" in
unit)
echo ""
echo "Running unit tests in Docker..."
docker run --rm claude-sdk-test \
python -m pytest tests/ -v
;;
e2e)
if [ -z "$ANTHROPIC_API_KEY" ]; then
echo "Error: ANTHROPIC_API_KEY environment variable is required for e2e tests"
echo ""
echo "Usage: ANTHROPIC_API_KEY=sk-... $0 e2e"
exit 1
fi
echo ""
echo "Running e2e tests in Docker..."
docker run --rm -e ANTHROPIC_API_KEY \
claude-sdk-test python -m pytest e2e-tests/ -v -m e2e
;;
all)
echo ""
echo "Running unit tests in Docker..."
docker run --rm claude-sdk-test \
python -m pytest tests/ -v
echo ""
if [ -n "$ANTHROPIC_API_KEY" ]; then
echo "Running e2e tests in Docker..."
docker run --rm -e ANTHROPIC_API_KEY \
claude-sdk-test python -m pytest e2e-tests/ -v -m e2e
else
echo "Skipping e2e tests (ANTHROPIC_API_KEY not set)"
fi
;;
*)
usage
;;
esac
echo ""
echo "Done!"

View file

@ -1,32 +0,0 @@
#!/usr/bin/env python3
"""Update Claude Code CLI version in _cli_version.py."""
import re
import sys
from pathlib import Path
def update_cli_version(new_version: str) -> None:
"""Update CLI version in _cli_version.py."""
# Update _cli_version.py
version_path = Path("src/claude_agent_sdk/_cli_version.py")
content = version_path.read_text()
content = re.sub(
r'__cli_version__ = "[^"]+"',
f'__cli_version__ = "{new_version}"',
content,
count=1,
flags=re.MULTILINE,
)
version_path.write_text(content)
print(f"Updated {version_path} to {new_version}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python scripts/update_cli_version.py <version>")
sys.exit(1)
update_cli_version(sys.argv[1])

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3
"""Update version in pyproject.toml and __init__.py files."""
import re
import sys
import re
from pathlib import Path
@ -18,7 +18,7 @@ def update_version(new_version: str) -> None:
f'version = "{new_version}"',
content,
count=1,
flags=re.MULTILINE,
flags=re.MULTILINE
)
pyproject_path.write_text(content)
@ -34,7 +34,7 @@ def update_version(new_version: str) -> None:
f'__version__ = "{new_version}"',
content,
count=1,
flags=re.MULTILINE,
flags=re.MULTILINE
)
version_path.write_text(content)
@ -45,5 +45,5 @@ if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python scripts/update_version.py <version>")
sys.exit(1)
update_version(sys.argv[1])
update_version(sys.argv[1])

View file

@ -18,14 +18,11 @@ from .query import query
from .types import (
AgentDefinition,
AssistantMessage,
BaseHookInput,
CanUseTool,
ClaudeAgentOptions,
ContentBlock,
HookCallback,
HookContext,
HookInput,
HookJSONOutput,
HookMatcher,
McpSdkServerConfig,
McpServerConfig,
@ -35,18 +32,8 @@ from .types import (
PermissionResultAllow,
PermissionResultDeny,
PermissionUpdate,
PostToolUseHookInput,
PreCompactHookInput,
PreToolUseHookInput,
ResultMessage,
SandboxIgnoreViolations,
SandboxNetworkConfig,
SandboxSettings,
SdkBeta,
SdkPluginConfig,
SettingSource,
StopHookInput,
SubagentStopHookInput,
SystemMessage,
TextBlock,
ThinkingBlock,
@ -54,7 +41,6 @@ from .types import (
ToolResultBlock,
ToolUseBlock,
UserMessage,
UserPromptSubmitHookInput,
)
# MCP Server Support
@ -208,7 +194,7 @@ def create_sdk_mcp_server(
- ClaudeAgentOptions: Configuration for using servers with query()
"""
from mcp.server import Server
from mcp.types import ImageContent, TextContent, Tool
from mcp.types import TextContent, Tool
# Create MCP server instance
server = Server(name, version=version)
@ -219,7 +205,7 @@ def create_sdk_mcp_server(
tool_map = {tool_def.name: tool_def for tool_def in tools}
# Register list_tools handler to expose available tools
@server.list_tools() # type: ignore[no-untyped-call,untyped-decorator]
@server.list_tools() # type: ignore[no-untyped-call,misc]
async def list_tools() -> list[Tool]:
"""Return the list of available tools."""
tool_list = []
@ -265,7 +251,7 @@ def create_sdk_mcp_server(
return tool_list
# Register call_tool handler to execute tools
@server.call_tool() # type: ignore[untyped-decorator]
@server.call_tool() # type: ignore[misc]
async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"""Execute a tool by name with given arguments."""
if name not in tool_map:
@ -278,19 +264,11 @@ def create_sdk_mcp_server(
# Convert result to MCP format
# The decorator expects us to return the content, not a CallToolResult
# It will wrap our return value in CallToolResult
content: list[TextContent | ImageContent] = []
content = []
if "content" in result:
for item in result["content"]:
if item.get("type") == "text":
content.append(TextContent(type="text", text=item["text"]))
if item.get("type") == "image":
content.append(
ImageContent(
type="image",
data=item["data"],
mimeType=item["mimeType"],
)
)
# Return just the content list - the decorator wraps it
return content
@ -328,30 +306,12 @@ __all__ = [
"PermissionResultAllow",
"PermissionResultDeny",
"PermissionUpdate",
# Hook support
"HookCallback",
"HookContext",
"HookInput",
"BaseHookInput",
"PreToolUseHookInput",
"PostToolUseHookInput",
"UserPromptSubmitHookInput",
"StopHookInput",
"SubagentStopHookInput",
"PreCompactHookInput",
"HookJSONOutput",
"HookMatcher",
# Agent support
"AgentDefinition",
"SettingSource",
# Plugin support
"SdkPluginConfig",
# Beta support
"SdkBeta",
# Sandbox support
"SandboxSettings",
"SandboxNetworkConfig",
"SandboxIgnoreViolations",
# MCP Server Support
"create_sdk_mcp_server",
"tool",

View file

@ -1,3 +0,0 @@
# Ignore bundled CLI binaries (downloaded during build)
claude
claude.exe

View file

@ -1,3 +0,0 @@
"""Bundled Claude Code CLI version."""
__cli_version__ = "2.0.74"

View file

@ -31,12 +31,10 @@ class InternalClient:
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher: dict[str, Any] = {
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
if hasattr(matcher, "timeout") and matcher.timeout is not None:
internal_matcher["timeout"] = matcher.timeout
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -73,8 +71,7 @@ class InternalClient:
chosen_transport = transport
else:
chosen_transport = SubprocessCLITransport(
prompt=prompt,
options=configured_options,
prompt=prompt, options=configured_options
)
# Connect transport

View file

@ -48,7 +48,6 @@ def parse_message(data: dict[str, Any]) -> Message:
case "user":
try:
parent_tool_use_id = data.get("parent_tool_use_id")
uuid = data.get("uuid")
if isinstance(data["message"]["content"], list):
user_content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
@ -75,12 +74,10 @@ def parse_message(data: dict[str, Any]) -> Message:
)
return UserMessage(
content=user_content_blocks,
uuid=uuid,
parent_tool_use_id=parent_tool_use_id,
)
return UserMessage(
content=data["message"]["content"],
uuid=uuid,
parent_tool_use_id=parent_tool_use_id,
)
except KeyError as e:
@ -123,7 +120,6 @@ def parse_message(data: dict[str, Any]) -> Message:
content=content_blocks,
model=data["message"]["model"],
parent_tool_use_id=data.get("parent_tool_use_id"),
error=data["message"].get("error"),
)
except KeyError as e:
raise MessageParseError(
@ -153,7 +149,6 @@ def parse_message(data: dict[str, Any]) -> Message:
total_cost_usd=data.get("total_cost_usd"),
usage=data.get("usage"),
result=data.get("result"),
structured_output=data.get("structured_output"),
)
except KeyError as e:
raise MessageParseError(

View file

@ -31,25 +31,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _convert_hook_output_for_cli(hook_output: dict[str, Any]) -> dict[str, Any]:
"""Convert Python-safe field names to CLI-expected field names.
The Python SDK uses `async_` and `continue_` to avoid keyword conflicts,
but the CLI expects `async` and `continue`. This function performs the
necessary conversion.
"""
converted = {}
for key, value in hook_output.items():
# Convert Python-safe names to JavaScript names
if key == "async_":
converted["async"] = value
elif key == "continue_":
converted["continue"] = value
else:
converted[key] = value
return converted
class Query:
"""Handles bidirectional control protocol on top of Transport.
@ -72,7 +53,6 @@ class Query:
| None = None,
hooks: dict[str, list[dict[str, Any]]] | None = None,
sdk_mcp_servers: dict[str, "McpServer"] | None = None,
initialize_timeout: float = 60.0,
):
"""Initialize Query with transport and callbacks.
@ -82,9 +62,7 @@ class Query:
can_use_tool: Optional callback for tool permission requests
hooks: Optional hook configurations
sdk_mcp_servers: Optional SDK MCP server instances
initialize_timeout: Timeout in seconds for the initialize request
"""
self._initialize_timeout = initialize_timeout
self.transport = transport
self.is_streaming_mode = is_streaming_mode
self.can_use_tool = can_use_tool
@ -107,12 +85,6 @@ class Query:
self._closed = False
self._initialization_result: dict[str, Any] | None = None
# Track first result for proper stream closure with SDK MCP servers
self._first_result_event = anyio.Event()
self._stream_close_timeout = (
float(os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")) / 1000.0
) # Convert ms to seconds
async def initialize(self) -> dict[str, Any] | None:
"""Initialize control protocol if in streaming mode.
@ -135,13 +107,12 @@ class Query:
self.next_callback_id += 1
self.hook_callbacks[callback_id] = callback
callback_ids.append(callback_id)
hook_matcher_config: dict[str, Any] = {
"matcher": matcher.get("matcher"),
"hookCallbackIds": callback_ids,
}
if matcher.get("timeout") is not None:
hook_matcher_config["timeout"] = matcher.get("timeout")
hooks_config[event].append(hook_matcher_config)
hooks_config[event].append(
{
"matcher": matcher.get("matcher"),
"hookCallbackIds": callback_ids,
}
)
# Send initialize request
request = {
@ -149,10 +120,7 @@ class Query:
"hooks": hooks_config if hooks_config else None,
}
# Use longer timeout for initialize since MCP servers may take time to start
response = await self._send_control_request(
request, timeout=self._initialize_timeout
)
response = await self._send_control_request(request)
self._initialized = True
self._initialization_result = response # Store for later access
return response
@ -201,10 +169,6 @@ class Query:
# TODO: Implement cancellation support
continue
# Track results for proper stream closure
if msg_type == "result":
self._first_result_event.set()
# Regular SDK messages go to the stream
await self._message_send.send(message)
@ -214,11 +178,6 @@ class Query:
raise # Re-raise to properly handle cancellation
except Exception as e:
logger.error(f"Fatal error in message reader: {e}")
# Signal all pending control requests so they fail fast instead of timing out
for request_id, event in list(self.pending_control_responses.items()):
if request_id not in self.pending_control_results:
self.pending_control_results[request_id] = e
event.set()
# Put error in stream so iterators can handle it
await self._message_send.send({"type": "error", "error": str(e)})
finally:
@ -236,7 +195,6 @@ class Query:
if subtype == "can_use_tool":
permission_request: SDKControlPermissionRequest = request_data # type: ignore[assignment]
original_input = permission_request["input"]
# Handle tool permission request
if not self.can_use_tool:
raise Exception("canUseTool callback is not provided")
@ -255,14 +213,9 @@ class Query:
# Convert PermissionResult to expected dict format
if isinstance(response, PermissionResultAllow):
response_data = {
"behavior": "allow",
"updatedInput": (
response.updated_input
if response.updated_input is not None
else original_input
),
}
response_data = {"behavior": "allow"}
if response.updated_input is not None:
response_data["updatedInput"] = response.updated_input
if response.updated_permissions is not None:
response_data["updatedPermissions"] = [
permission.to_dict()
@ -285,13 +238,11 @@ class Query:
if not callback:
raise Exception(f"No hook callback found for ID: {callback_id}")
hook_output = await callback(
response_data = await callback(
request_data.get("input"),
request_data.get("tool_use_id"),
{"signal": None}, # TODO: Add abort signal support
)
# Convert Python-safe field names (async_, continue_) to CLI-expected names (async, continue)
response_data = _convert_hook_output_for_cli(hook_output)
elif subtype == "mcp_message":
# Handle SDK MCP request
@ -336,15 +287,8 @@ class Query:
}
await self.transport.write(json.dumps(error_response) + "\n")
async def _send_control_request(
self, request: dict[str, Any], timeout: float = 60.0
) -> dict[str, Any]:
"""Send control request to CLI and wait for response.
Args:
request: The control request to send
timeout: Timeout in seconds to wait for response (default 60s)
"""
async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]:
"""Send control request to CLI and wait for response."""
if not self.is_streaming_mode:
raise Exception("Control requests require streaming mode")
@ -367,7 +311,7 @@ class Query:
# Wait for response
try:
with anyio.fail_after(timeout):
with anyio.fail_after(60.0):
await event.wait()
result = self.pending_control_results.pop(request_id)
@ -539,51 +483,14 @@ class Query:
}
)
async def rewind_files(self, user_message_id: str) -> None:
"""Rewind tracked files to their state at a specific user message.
Requires file checkpointing to be enabled via the `enable_file_checkpointing` option.
Args:
user_message_id: UUID of the user message to rewind to
"""
await self._send_control_request(
{
"subtype": "rewind_files",
"user_message_id": user_message_id,
}
)
async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None:
"""Stream input messages to transport.
If SDK MCP servers or hooks are present, waits for the first result
before closing stdin to allow bidirectional control protocol communication.
"""
"""Stream input messages to transport."""
try:
async for message in stream:
if self._closed:
break
await self.transport.write(json.dumps(message) + "\n")
# If we have SDK MCP servers or hooks that need bidirectional communication,
# wait for first result before closing the channel
has_hooks = bool(self.hooks)
if self.sdk_mcp_servers or has_hooks:
logger.debug(
f"Waiting for first result before closing stdin "
f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})"
)
try:
with anyio.move_on_after(self._stream_close_timeout):
await self._first_result_event.wait()
logger.debug("Received first result, closing input stream")
except Exception:
logger.debug(
"Timed out waiting for first result, closing input stream"
)
# After all messages sent (and result received if needed), end input
# After all messages sent, end input
await self.transport.end_input()
except Exception as e:
logger.debug(f"Error streaming input: {e}")

View file

@ -3,11 +3,9 @@
import json
import logging
import os
import platform
import re
import shutil
import sys
import tempfile
from collections.abc import AsyncIterable, AsyncIterator
from contextlib import suppress
from dataclasses import asdict
@ -31,11 +29,6 @@ logger = logging.getLogger(__name__)
_DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
MINIMUM_CLAUDE_CODE_VERSION = "2.0.0"
# Platform-specific command line length limits
# Windows cmd.exe has a limit of 8191 characters, use 8000 for safety
# Other platforms have much higher limits
_CMD_LENGTH_LIMIT = 8000 if platform.system() == "Windows" else 100000
class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
@ -44,13 +37,12 @@ class SubprocessCLITransport(Transport):
self,
prompt: str | AsyncIterable[dict[str, Any]],
options: ClaudeAgentOptions,
cli_path: str | Path | None = None,
):
self._prompt = prompt
self._is_streaming = not isinstance(prompt, str)
self._options = options
self._cli_path = (
str(options.cli_path) if options.cli_path is not None else self._find_cli()
)
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout_stream: TextReceiveStream | None = None
@ -64,17 +56,9 @@ class SubprocessCLITransport(Transport):
if options.max_buffer_size is not None
else _DEFAULT_MAX_BUFFER_SIZE
)
self._temp_files: list[str] = [] # Track temporary files for cleanup
self._write_lock: anyio.Lock = anyio.Lock()
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
# First, check for bundled CLI
bundled_cli = self._find_bundled_cli()
if bundled_cli:
return bundled_cli
# Fall back to system-wide search
if cli := shutil.which("claude"):
return cli
@ -84,7 +68,6 @@ class SubprocessCLITransport(Transport):
Path.home() / ".local/bin/claude",
Path.home() / "node_modules/.bin/claude",
Path.home() / ".yarn/bin/claude",
Path.home() / ".claude/local/claude",
]
for path in locations:
@ -96,85 +79,16 @@ class SubprocessCLITransport(Transport):
" npm install -g @anthropic-ai/claude-code\n"
"\nIf already installed locally, try:\n"
' export PATH="$HOME/node_modules/.bin:$PATH"\n'
"\nOr provide the path via ClaudeAgentOptions:\n"
" ClaudeAgentOptions(cli_path='/path/to/claude')"
"\nOr specify the path when creating transport:\n"
" SubprocessCLITransport(..., cli_path='/path/to/claude')"
)
def _find_bundled_cli(self) -> str | None:
"""Find bundled CLI binary if it exists."""
# Determine the CLI binary name based on platform
cli_name = "claude.exe" if platform.system() == "Windows" else "claude"
# Get the path to the bundled CLI
# The _bundled directory is in the same package as this module
bundled_path = Path(__file__).parent.parent.parent / "_bundled" / cli_name
if bundled_path.exists() and bundled_path.is_file():
logger.info(f"Using bundled Claude Code CLI: {bundled_path}")
return str(bundled_path)
return None
def _build_settings_value(self) -> str | None:
"""Build settings value, merging sandbox settings if provided.
Returns the settings value as either:
- A JSON string (if sandbox is provided or settings is JSON)
- A file path (if only settings path is provided without sandbox)
- None if neither settings nor sandbox is provided
"""
has_settings = self._options.settings is not None
has_sandbox = self._options.sandbox is not None
if not has_settings and not has_sandbox:
return None
# If only settings path and no sandbox, pass through as-is
if has_settings and not has_sandbox:
return self._options.settings
# If we have sandbox settings, we need to merge into a JSON object
settings_obj: dict[str, Any] = {}
if has_settings:
assert self._options.settings is not None
settings_str = self._options.settings.strip()
# Check if settings is a JSON string or a file path
if settings_str.startswith("{") and settings_str.endswith("}"):
# Parse JSON string
try:
settings_obj = json.loads(settings_str)
except json.JSONDecodeError:
# If parsing fails, treat as file path
logger.warning(
f"Failed to parse settings as JSON, treating as file path: {settings_str}"
)
# Read the file
settings_path = Path(settings_str)
if settings_path.exists():
with settings_path.open(encoding="utf-8") as f:
settings_obj = json.load(f)
else:
# It's a file path - read and parse
settings_path = Path(settings_str)
if settings_path.exists():
with settings_path.open(encoding="utf-8") as f:
settings_obj = json.load(f)
else:
logger.warning(f"Settings file not found: {settings_path}")
# Merge sandbox settings
if has_sandbox:
settings_obj["sandbox"] = self._options.sandbox
return json.dumps(settings_obj)
def _build_command(self) -> list[str]:
"""Build CLI command with arguments."""
cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"]
if self._options.system_prompt is None:
cmd.extend(["--system-prompt", ""])
pass
elif isinstance(self._options.system_prompt, str):
cmd.extend(["--system-prompt", self._options.system_prompt])
else:
@ -186,39 +100,18 @@ class SubprocessCLITransport(Transport):
["--append-system-prompt", self._options.system_prompt["append"]]
)
# Handle tools option (base set of tools)
if self._options.tools is not None:
tools = self._options.tools
if isinstance(tools, list):
if len(tools) == 0:
cmd.extend(["--tools", ""])
else:
cmd.extend(["--tools", ",".join(tools)])
else:
# Preset object - 'claude_code' preset maps to 'default'
cmd.extend(["--tools", "default"])
if self._options.allowed_tools:
cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)])
if self._options.max_turns:
cmd.extend(["--max-turns", str(self._options.max_turns)])
if self._options.max_budget_usd is not None:
cmd.extend(["--max-budget-usd", str(self._options.max_budget_usd)])
if self._options.disallowed_tools:
cmd.extend(["--disallowedTools", ",".join(self._options.disallowed_tools)])
if self._options.model:
cmd.extend(["--model", self._options.model])
if self._options.fallback_model:
cmd.extend(["--fallback-model", self._options.fallback_model])
if self._options.betas:
cmd.extend(["--betas", ",".join(self._options.betas)])
if self._options.permission_prompt_tool_name:
cmd.extend(
["--permission-prompt-tool", self._options.permission_prompt_tool_name]
@ -233,10 +126,8 @@ class SubprocessCLITransport(Transport):
if self._options.resume:
cmd.extend(["--resume", self._options.resume])
# Handle settings and sandbox: merge sandbox into settings if both are provided
settings_value = self._build_settings_value()
if settings_value:
cmd.extend(["--settings", settings_value])
if self._options.settings:
cmd.extend(["--settings", self._options.settings])
if self._options.add_dirs:
# Convert all paths to strings and add each directory
@ -281,8 +172,7 @@ class SubprocessCLITransport(Transport):
name: {k: v for k, v in asdict(agent_def).items() if v is not None}
for name, agent_def in self._options.agents.items()
}
agents_json = json.dumps(agents_dict)
cmd.extend(["--agents", agents_json])
cmd.extend(["--agents", json.dumps(agents_dict)])
sources_value = (
",".join(self._options.setting_sources)
@ -291,14 +181,6 @@ class SubprocessCLITransport(Transport):
)
cmd.extend(["--setting-sources", sources_value])
# Add plugin directories
if self._options.plugins:
for plugin in self._options.plugins:
if plugin["type"] == "local":
cmd.extend(["--plugin-dir", plugin["path"]])
else:
raise ValueError(f"Unsupported plugin type: {plugin['type']}")
# Add extra args for future CLI flags
for flag, value in self._options.extra_args.items():
if value is None:
@ -308,24 +190,7 @@ class SubprocessCLITransport(Transport):
# Flag with value
cmd.extend([f"--{flag}", str(value)])
if self._options.max_thinking_tokens is not None:
cmd.extend(
["--max-thinking-tokens", str(self._options.max_thinking_tokens)]
)
# Extract schema from output_format structure if provided
# Expected: {"type": "json_schema", "schema": {...}}
if (
self._options.output_format is not None
and isinstance(self._options.output_format, dict)
and self._options.output_format.get("type") == "json_schema"
):
schema = self._options.output_format.get("schema")
if schema is not None:
cmd.extend(["--json-schema", json.dumps(schema)])
# Add prompt handling based on mode
# IMPORTANT: This must come AFTER all flags because everything after "--" is treated as arguments
if self._is_streaming:
# Streaming mode: use --input-format stream-json
cmd.extend(["--input-format", "stream-json"])
@ -333,37 +198,6 @@ class SubprocessCLITransport(Transport):
# String mode: use --print with the prompt
cmd.extend(["--print", "--", str(self._prompt)])
# Check if command line is too long (Windows limitation)
cmd_str = " ".join(cmd)
if len(cmd_str) > _CMD_LENGTH_LIMIT and self._options.agents:
# Command is too long - use temp file for agents
# Find the --agents argument and replace its value with @filepath
try:
agents_idx = cmd.index("--agents")
agents_json_value = cmd[agents_idx + 1]
# Create a temporary file
# ruff: noqa: SIM115
temp_file = tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False, encoding="utf-8"
)
temp_file.write(agents_json_value)
temp_file.close()
# Track for cleanup
self._temp_files.append(temp_file.name)
# Replace agents JSON with @filepath reference
cmd[agents_idx + 1] = f"@{temp_file.name}"
logger.info(
f"Command line length ({len(cmd_str)}) exceeds limit ({_CMD_LENGTH_LIMIT}). "
f"Using temp file for --agents: {temp_file.name}"
)
except (ValueError, IndexError) as e:
# This shouldn't happen, but log it just in case
logger.warning(f"Failed to optimize command line length: {e}")
return cmd
async def connect(self) -> None:
@ -371,8 +205,7 @@ class SubprocessCLITransport(Transport):
if self._process:
return
if not os.environ.get("CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK"):
await self._check_claude_version()
await self._check_claude_version()
cmd = self._build_command()
try:
@ -384,10 +217,6 @@ class SubprocessCLITransport(Transport):
"CLAUDE_AGENT_SDK_VERSION": __version__,
}
# Enable file checkpointing if requested
if self._options.enable_file_checkpointing:
process_env["CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING"] = "true"
if self._cwd:
process_env["PWD"] = self._cwd
@ -476,14 +305,9 @@ class SubprocessCLITransport(Transport):
async def close(self) -> None:
"""Close the transport and clean up resources."""
# Clean up temporary files first (before early return)
for temp_file in self._temp_files:
with suppress(Exception):
Path(temp_file).unlink(missing_ok=True)
self._temp_files.clear()
self._ready = False
if not self._process:
self._ready = False
return
# Close stderr task group if active
@ -493,19 +317,21 @@ class SubprocessCLITransport(Transport):
await self._stderr_task_group.__aexit__(None, None, None)
self._stderr_task_group = None
# Close stdin stream (acquire lock to prevent race with concurrent writes)
async with self._write_lock:
self._ready = False # Set inside lock to prevent TOCTOU with write()
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
# Close streams
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
if self._stderr_stream:
with suppress(Exception):
await self._stderr_stream.aclose()
self._stderr_stream = None
if self._process.stdin:
with suppress(Exception):
await self._process.stdin.aclose()
# Terminate and wait for process
if self._process.returncode is None:
with suppress(ProcessLookupError):
@ -523,37 +349,37 @@ class SubprocessCLITransport(Transport):
async def write(self, data: str) -> None:
"""Write raw data to the transport."""
async with self._write_lock:
# All checks inside lock to prevent TOCTOU races with close()/end_input()
if not self._ready or not self._stdin_stream:
raise CLIConnectionError("ProcessTransport is not ready for writing")
# Check if ready (like TypeScript)
if not self._ready or not self._stdin_stream:
raise CLIConnectionError("ProcessTransport is not ready for writing")
if self._process and self._process.returncode is not None:
raise CLIConnectionError(
f"Cannot write to terminated process (exit code: {self._process.returncode})"
)
# Check if process is still alive (like TypeScript)
if self._process and self._process.returncode is not None:
raise CLIConnectionError(
f"Cannot write to terminated process (exit code: {self._process.returncode})"
)
if self._exit_error:
raise CLIConnectionError(
f"Cannot write to process that exited with error: {self._exit_error}"
) from self._exit_error
# Check for exit errors (like TypeScript)
if self._exit_error:
raise CLIConnectionError(
f"Cannot write to process that exited with error: {self._exit_error}"
) from self._exit_error
try:
await self._stdin_stream.send(data)
except Exception as e:
self._ready = False
self._exit_error = CLIConnectionError(
f"Failed to write to process stdin: {e}"
)
raise self._exit_error from e
try:
await self._stdin_stream.send(data)
except Exception as e:
self._ready = False # Mark as not ready (like TypeScript)
self._exit_error = CLIConnectionError(
f"Failed to write to process stdin: {e}"
)
raise self._exit_error from e
async def end_input(self) -> None:
"""End the input stream (close stdin)."""
async with self._write_lock:
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
if self._stdin_stream:
with suppress(Exception):
await self._stdin_stream.aclose()
self._stdin_stream = None
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Read and parse messages from the transport."""

View file

@ -1,3 +1,3 @@
"""Version information for claude-agent-sdk."""
__version__ = "0.1.18"
__version__ = "0.1.1"

View file

@ -75,12 +75,10 @@ class ClaudeSDKClient:
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher: dict[str, Any] = {
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
if hasattr(matcher, "timeout") and matcher.timeout is not None:
internal_matcher["timeout"] = matcher.timeout
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -140,13 +138,6 @@ class ClaudeSDKClient:
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
# Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set
# CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds
initialize_timeout_ms = int(
os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")
)
initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0)
# Create Query to handle control protocol
self._query = Query(
transport=self._transport,
@ -156,7 +147,6 @@ class ClaudeSDKClient:
if self.options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
initialize_timeout=initialize_timeout,
)
# Start reading messages and initialize
@ -261,38 +251,6 @@ class ClaudeSDKClient:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.set_model(model)
async def rewind_files(self, user_message_id: str) -> None:
"""Rewind tracked files to their state at a specific user message.
Requires:
- `enable_file_checkpointing=True` to track file changes
- `extra_args={"replay-user-messages": None}` to receive UserMessage
objects with `uuid` in the response stream
Args:
user_message_id: UUID of the user message to rewind to. This should be
the `uuid` field from a `UserMessage` received during the conversation.
Example:
```python
options = ClaudeAgentOptions(
enable_file_checkpointing=True,
extra_args={"replay-user-messages": None},
)
async with ClaudeSDKClient(options) as client:
await client.query("Make some changes to my files")
async for msg in client.receive_response():
if isinstance(msg, UserMessage) and msg.uuid:
checkpoint_id = msg.uuid # Save this for later
# Later, rewind to that point
await client.rewind_files(checkpoint_id)
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.rewind_files(user_message_id)
async def get_server_info(self) -> dict[str, Any] | None:
"""Get server initialization info including available commands and output styles.

View file

@ -10,16 +10,10 @@ from typing_extensions import NotRequired
if TYPE_CHECKING:
from mcp.server import Server as McpServer
else:
# Runtime placeholder for forward reference resolution in Pydantic 2.12+
McpServer = Any
# Permission modes
PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"]
# SDK Beta features - see https://docs.anthropic.com/en/api/beta-headers
SdkBeta = Literal["context-1m-2025-08-07"]
# Agent definitions
SettingSource = Literal["user", "project", "local"]
@ -32,13 +26,6 @@ class SystemPromptPreset(TypedDict):
append: NotRequired[str]
class ToolsPreset(TypedDict):
"""Tools preset configuration."""
type: Literal["preset"]
preset: Literal["claude_code"]
@dataclass
class AgentDefinition:
"""Agent definition configuration."""
@ -170,197 +157,35 @@ HookEvent = (
)
# Hook input types - strongly typed for each hook event
class BaseHookInput(TypedDict):
"""Base hook input fields present across many hook events."""
session_id: str
transcript_path: str
cwd: str
permission_mode: NotRequired[str]
class PreToolUseHookInput(BaseHookInput):
"""Input data for PreToolUse hook events."""
hook_event_name: Literal["PreToolUse"]
tool_name: str
tool_input: dict[str, Any]
class PostToolUseHookInput(BaseHookInput):
"""Input data for PostToolUse hook events."""
hook_event_name: Literal["PostToolUse"]
tool_name: str
tool_input: dict[str, Any]
tool_response: Any
class UserPromptSubmitHookInput(BaseHookInput):
"""Input data for UserPromptSubmit hook events."""
hook_event_name: Literal["UserPromptSubmit"]
prompt: str
class StopHookInput(BaseHookInput):
"""Input data for Stop hook events."""
hook_event_name: Literal["Stop"]
stop_hook_active: bool
class SubagentStopHookInput(BaseHookInput):
"""Input data for SubagentStop hook events."""
hook_event_name: Literal["SubagentStop"]
stop_hook_active: bool
class PreCompactHookInput(BaseHookInput):
"""Input data for PreCompact hook events."""
hook_event_name: Literal["PreCompact"]
trigger: Literal["manual", "auto"]
custom_instructions: str | None
# Union type for all hook inputs
HookInput = (
PreToolUseHookInput
| PostToolUseHookInput
| UserPromptSubmitHookInput
| StopHookInput
| SubagentStopHookInput
| PreCompactHookInput
)
# Hook-specific output types
class PreToolUseHookSpecificOutput(TypedDict):
"""Hook-specific output for PreToolUse events."""
hookEventName: Literal["PreToolUse"]
permissionDecision: NotRequired[Literal["allow", "deny", "ask"]]
permissionDecisionReason: NotRequired[str]
updatedInput: NotRequired[dict[str, Any]]
class PostToolUseHookSpecificOutput(TypedDict):
"""Hook-specific output for PostToolUse events."""
hookEventName: Literal["PostToolUse"]
additionalContext: NotRequired[str]
class UserPromptSubmitHookSpecificOutput(TypedDict):
"""Hook-specific output for UserPromptSubmit events."""
hookEventName: Literal["UserPromptSubmit"]
additionalContext: NotRequired[str]
class SessionStartHookSpecificOutput(TypedDict):
"""Hook-specific output for SessionStart events."""
hookEventName: Literal["SessionStart"]
additionalContext: NotRequired[str]
HookSpecificOutput = (
PreToolUseHookSpecificOutput
| PostToolUseHookSpecificOutput
| UserPromptSubmitHookSpecificOutput
| SessionStartHookSpecificOutput
)
# See https://docs.anthropic.com/en/docs/claude-code/hooks#advanced%3A-json-output
# for documentation of the output types.
#
# IMPORTANT: The Python SDK uses `async_` and `continue_` (with underscores) to avoid
# Python keyword conflicts. These fields are automatically converted to `async` and
# `continue` when sent to the CLI. You should use the underscore versions in your
# Python code.
class AsyncHookJSONOutput(TypedDict):
"""Async hook output that defers hook execution.
Fields:
async_: Set to True to defer hook execution. Note: This is converted to
"async" when sent to the CLI - use "async_" in your Python code.
asyncTimeout: Optional timeout in milliseconds for the async operation.
"""
async_: Literal[
True
] # Using async_ to avoid Python keyword (converted to "async" for CLI)
asyncTimeout: NotRequired[int]
class SyncHookJSONOutput(TypedDict):
"""Synchronous hook output with control and decision fields.
This defines the structure for hook callbacks to control execution and provide
feedback to Claude.
Common Control Fields:
continue_: Whether Claude should proceed after hook execution (default: True).
Note: This is converted to "continue" when sent to the CLI.
suppressOutput: Hide stdout from transcript mode (default: False).
stopReason: Message shown when continue is False.
Decision Fields:
decision: Set to "block" to indicate blocking behavior.
systemMessage: Warning message displayed to the user.
reason: Feedback message for Claude about the decision.
Hook-Specific Output:
hookSpecificOutput: Event-specific controls (e.g., permissionDecision for
PreToolUse, additionalContext for PostToolUse).
Note: The CLI documentation shows field names without underscores ("async", "continue"),
but Python code should use the underscore versions ("async_", "continue_") as they
are automatically converted.
"""
# Common control fields
continue_: NotRequired[
bool
] # Using continue_ to avoid Python keyword (converted to "continue" for CLI)
suppressOutput: NotRequired[bool]
stopReason: NotRequired[str]
# Decision fields
# Note: "approve" is deprecated for PreToolUse (use permissionDecision instead)
# For other hooks, only "block" is meaningful
# for documentation of the output types. Currently, "continue", "stopReason",
# and "suppressOutput" are not supported in the Python SDK.
class HookJSONOutput(TypedDict):
# Whether to block the action related to the hook.
decision: NotRequired[Literal["block"]]
# Optionally add a system message that is not visible to Claude but saved in
# the chat transcript.
systemMessage: NotRequired[str]
reason: NotRequired[str]
# Hook-specific outputs
hookSpecificOutput: NotRequired[HookSpecificOutput]
# See each hook's individual "Decision Control" section in the documentation
# for guidance.
hookSpecificOutput: NotRequired[Any]
HookJSONOutput = AsyncHookJSONOutput | SyncHookJSONOutput
@dataclass
class HookContext:
"""Context information for hook callbacks."""
class HookContext(TypedDict):
"""Context information for hook callbacks.
Fields:
signal: Reserved for future abort signal support. Currently always None.
"""
signal: Any | None # Future: abort signal support
signal: Any | None = None # Future: abort signal support
HookCallback = Callable[
# HookCallback input parameters:
# - input: Strongly-typed hook input with discriminated unions based on hook_event_name
# - tool_use_id: Optional tool use identifier
# - context: Hook context with abort signal support (currently placeholder)
[HookInput, str | None, HookContext],
# - input
# See https://docs.anthropic.com/en/docs/claude-code/hooks#hook-input for
# the type of 'input', the first value.
# - tool_use_id
# - context
[dict[str, Any], str | None, HookContext],
Awaitable[HookJSONOutput],
]
@ -379,9 +204,6 @@ class HookMatcher:
# A list of Python functions with function signature HookCallback
hooks: list[HookCallback] = field(default_factory=list)
# Timeout in seconds for all hooks in this matcher (default: 60)
timeout: float | None = None
# MCP Server config
class McpStdioServerConfig(TypedDict):
@ -422,93 +244,6 @@ McpServerConfig = (
)
class SdkPluginConfig(TypedDict):
"""SDK plugin configuration.
Currently only local plugins are supported via the 'local' type.
"""
type: Literal["local"]
path: str
# Sandbox configuration types
class SandboxNetworkConfig(TypedDict, total=False):
"""Network configuration for sandbox.
Attributes:
allowUnixSockets: Unix socket paths accessible in sandbox (e.g., SSH agents).
allowAllUnixSockets: Allow all Unix sockets (less secure).
allowLocalBinding: Allow binding to localhost ports (macOS only).
httpProxyPort: HTTP proxy port if bringing your own proxy.
socksProxyPort: SOCKS5 proxy port if bringing your own proxy.
"""
allowUnixSockets: list[str]
allowAllUnixSockets: bool
allowLocalBinding: bool
httpProxyPort: int
socksProxyPort: int
class SandboxIgnoreViolations(TypedDict, total=False):
"""Violations to ignore in sandbox.
Attributes:
file: File paths for which violations should be ignored.
network: Network hosts for which violations should be ignored.
"""
file: list[str]
network: list[str]
class SandboxSettings(TypedDict, total=False):
"""Sandbox settings configuration.
This controls how Claude Code sandboxes bash commands for filesystem
and network isolation.
**Important:** Filesystem and network restrictions are configured via permission
rules, not via these sandbox settings:
- Filesystem read restrictions: Use Read deny rules
- Filesystem write restrictions: Use Edit allow/deny rules
- Network restrictions: Use WebFetch allow/deny rules
Attributes:
enabled: Enable bash sandboxing (macOS/Linux only). Default: False
autoAllowBashIfSandboxed: Auto-approve bash commands when sandboxed. Default: True
excludedCommands: Commands that should run outside the sandbox (e.g., ["git", "docker"])
allowUnsandboxedCommands: Allow commands to bypass sandbox via dangerouslyDisableSandbox.
When False, all commands must run sandboxed (or be in excludedCommands). Default: True
network: Network configuration for sandbox.
ignoreViolations: Violations to ignore.
enableWeakerNestedSandbox: Enable weaker sandbox for unprivileged Docker environments
(Linux only). Reduces security. Default: False
Example:
```python
sandbox_settings: SandboxSettings = {
"enabled": True,
"autoAllowBashIfSandboxed": True,
"excludedCommands": ["docker"],
"network": {
"allowUnixSockets": ["/var/run/docker.sock"],
"allowLocalBinding": True
}
}
```
"""
enabled: bool
autoAllowBashIfSandboxed: bool
excludedCommands: list[str]
allowUnsandboxedCommands: bool
network: SandboxNetworkConfig
ignoreViolations: SandboxIgnoreViolations
enableWeakerNestedSandbox: bool
# Content block types
@dataclass
class TextBlock:
@ -547,22 +282,11 @@ ContentBlock = TextBlock | ThinkingBlock | ToolUseBlock | ToolResultBlock
# Message types
AssistantMessageError = Literal[
"authentication_failed",
"billing_error",
"rate_limit",
"invalid_request",
"server_error",
"unknown",
]
@dataclass
class UserMessage:
"""User message."""
content: str | list[ContentBlock]
uuid: str | None = None
parent_tool_use_id: str | None = None
@ -573,7 +297,6 @@ class AssistantMessage:
content: list[ContentBlock]
model: str
parent_tool_use_id: str | None = None
error: AssistantMessageError | None = None
@dataclass
@ -597,7 +320,6 @@ class ResultMessage:
total_cost_usd: float | None = None
usage: dict[str, Any] | None = None
result: str | None = None
structured_output: Any = None
@dataclass
@ -617,7 +339,6 @@ Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage | Strea
class ClaudeAgentOptions:
"""Query options for Claude SDK."""
tools: list[str] | ToolsPreset | None = None
allowed_tools: list[str] = field(default_factory=list)
system_prompt: str | SystemPromptPreset | None = None
mcp_servers: dict[str, McpServerConfig] | str | Path = field(default_factory=dict)
@ -625,15 +346,10 @@ class ClaudeAgentOptions:
continue_conversation: bool = False
resume: str | None = None
max_turns: int | None = None
max_budget_usd: float | None = None
disallowed_tools: list[str] = field(default_factory=list)
model: str | None = None
fallback_model: str | None = None
# Beta features - see https://docs.anthropic.com/en/api/beta-headers
betas: list[SdkBeta] = field(default_factory=list)
permission_prompt_tool_name: str | None = None
cwd: str | Path | None = None
cli_path: str | Path | None = None
settings: str | None = None
add_dirs: list[str | Path] = field(default_factory=list)
env: dict[str, str] = field(default_factory=dict)
@ -663,21 +379,6 @@ class ClaudeAgentOptions:
agents: dict[str, AgentDefinition] | None = None
# Setting sources to load (user, project, local)
setting_sources: list[SettingSource] | None = None
# Sandbox configuration for bash command isolation.
# Filesystem and network restrictions are derived from permission rules (Read/Edit/WebFetch),
# not from these sandbox settings.
sandbox: SandboxSettings | None = None
# Plugin configurations for custom plugins
plugins: list[SdkPluginConfig] = field(default_factory=list)
# Max tokens for thinking blocks
max_thinking_tokens: int | None = None
# Output format for structured outputs (matches Messages API structure)
# Example: {"type": "json_schema", "schema": {"type": "object", "properties": {...}}}
output_format: dict[str, Any] | None = None
# Enable file checkpointing to track file changes during the session.
# When enabled, files can be rewound to their state at any user message
# using `ClaudeSDKClient.rewind_files()`.
enable_file_checkpointing: bool = False
# SDK Control Protocol
@ -718,11 +419,6 @@ class SDKControlMcpMessageRequest(TypedDict):
message: Any
class SDKControlRewindFilesRequest(TypedDict):
subtype: Literal["rewind_files"]
user_message_id: str
class SDKControlRequest(TypedDict):
type: Literal["control_request"]
request_id: str
@ -733,7 +429,6 @@ class SDKControlRequest(TypedDict):
| SDKControlSetPermissionModeRequest
| SDKHookCallbackRequest
| SDKControlMcpMessageRequest
| SDKControlRewindFilesRequest
)

View file

@ -212,73 +212,3 @@ class TestIntegration:
assert call_kwargs["options"].continue_conversation is True
anyio.run(_test)
def test_max_budget_usd_option(self):
"""Test query with max_budget_usd option."""
async def _test():
with patch(
"claude_agent_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream that exceeds budget
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{"type": "text", "text": "Starting to read..."}
],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "error_max_budget_usd",
"duration_ms": 500,
"duration_api_ms": 400,
"is_error": False,
"num_turns": 1,
"session_id": "test-session-budget",
"total_cost_usd": 0.0002,
"usage": {
"input_tokens": 100,
"output_tokens": 50,
},
}
mock_transport.read_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.close = AsyncMock()
mock_transport.end_input = AsyncMock()
mock_transport.write = AsyncMock()
mock_transport.is_ready = Mock(return_value=True)
# Run query with very small budget
messages = []
async for msg in query(
prompt="Read the readme",
options=ClaudeAgentOptions(max_budget_usd=0.0001),
):
messages.append(msg)
# Verify results
assert len(messages) == 2
# Check result message
assert isinstance(messages[1], ResultMessage)
assert messages[1].subtype == "error_max_budget_usd"
assert messages[1].is_error is False
assert messages[1].total_cost_usd == 0.0002
assert messages[1].total_cost_usd is not None
assert messages[1].total_cost_usd > 0
# Verify transport was created with max_budget_usd option
mock_transport_class.assert_called_once()
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["options"].max_budget_usd == 0.0001
anyio.run(_test)

View file

@ -31,21 +31,6 @@ class TestMessageParser:
assert isinstance(message.content[0], TextBlock)
assert message.content[0].text == "Hello"
def test_parse_user_message_with_uuid(self):
"""Test parsing a user message with uuid field (issue #414).
The uuid field is needed for file checkpointing with rewind_files().
"""
data = {
"type": "user",
"uuid": "msg-abc123-def456",
"message": {"content": [{"type": "text", "text": "Hello"}]},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert message.uuid == "msg-abc123-def456"
assert len(message.content) == 1
def test_parse_user_message_with_tool_use(self):
"""Test parsing a user message with tool_use block."""
data = {

View file

@ -4,11 +4,9 @@ This test file verifies that SDK MCP servers work correctly through the full sta
matching the TypeScript SDK test/sdk.test.ts pattern.
"""
import base64
from typing import Any
import pytest
from mcp.types import CallToolRequest, CallToolRequestParams
from claude_agent_sdk import (
ClaudeAgentOptions,
@ -193,73 +191,3 @@ async def test_server_creation():
# When no tools are provided, the handlers are not registered
assert ListToolsRequest not in instance.request_handlers
@pytest.mark.asyncio
async def test_image_content_support():
"""Test that tools can return image content with base64 data."""
# Create sample base64 image data (a simple 1x1 pixel PNG)
png_data = base64.b64encode(
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01"
b"\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13"
b"\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\x0cIDATx\x9cc```"
b"\x00\x00\x00\x04\x00\x01]U!\x1c\x00\x00\x00\x00IEND\xaeB`\x82"
).decode("utf-8")
# Track tool executions
tool_executions: list[dict[str, Any]] = []
# Create a tool that returns both text and image content
@tool(
"generate_chart", "Generates a chart and returns it as an image", {"title": str}
)
async def generate_chart(args: dict[str, Any]) -> dict[str, Any]:
tool_executions.append({"name": "generate_chart", "args": args})
return {
"content": [
{"type": "text", "text": f"Generated chart: {args['title']}"},
{
"type": "image",
"data": png_data,
"mimeType": "image/png",
},
]
}
server_config = create_sdk_mcp_server(
name="image-test-server", version="1.0.0", tools=[generate_chart]
)
# Get the server instance
server = server_config["instance"]
call_handler = server.request_handlers[CallToolRequest]
# Call the chart generation tool
chart_request = CallToolRequest(
method="tools/call",
params=CallToolRequestParams(
name="generate_chart", arguments={"title": "Sales Report"}
),
)
result = await call_handler(chart_request)
# Verify the result contains both text and image content
assert len(result.root.content) == 2
# Check text content
text_content = result.root.content[0]
assert text_content.type == "text"
assert text_content.text == "Generated chart: Sales Report"
# Check image content
image_content = result.root.content[1]
assert image_content.type == "image"
assert image_content.data == png_data
assert image_content.mimeType == "image/png"
# Verify the tool was executed correctly
assert len(tool_executions) == 1
assert tool_executions[0]["name"] == "generate_chart"
assert tool_executions[0]["args"]["title"] == "Sales Report"

View file

@ -633,28 +633,21 @@ assert '"Second"' in stdin_messages[1]
print('{"type": "result", "subtype": "success", "duration_ms": 100, "duration_api_ms": 50, "is_error": false, "num_turns": 1, "session_id": "test", "total_cost_usd": 0.001}')
""")
# Make script executable (Unix-style systems)
if sys.platform != "win32":
Path(test_script).chmod(0o755)
Path(test_script).chmod(0o755)
try:
# Mock _find_cli to return the test script path directly
# Mock _find_cli to return python executing our test script
with patch.object(
SubprocessCLITransport, "_find_cli", return_value=test_script
SubprocessCLITransport, "_find_cli", return_value=sys.executable
):
# Mock _build_command to properly execute Python script
# Mock _build_command to add our test script as first argument
original_build_command = SubprocessCLITransport._build_command
def mock_build_command(self):
# Get original command
cmd = original_build_command(self)
# On Windows, we need to use python interpreter to run the script
if sys.platform == "win32":
# Replace first element with python interpreter and script
cmd[0:1] = [sys.executable, test_script]
else:
# On Unix, just use the script directly
cmd[0] = test_script
# Replace the CLI path with python + script
cmd[0] = test_script
return cmd
with patch.object(

View file

@ -15,15 +15,6 @@ from claude_agent_sdk._internal.transport.subprocess_cli import (
)
from claude_agent_sdk.types import ClaudeAgentOptions
DEFAULT_CLI_PATH = "/usr/bin/claude"
def make_options(**kwargs: object) -> ClaudeAgentOptions:
"""Construct ClaudeAgentOptions with a default CLI path for tests."""
cli_path = kwargs.pop("cli_path", DEFAULT_CLI_PATH)
return ClaudeAgentOptions(cli_path=cli_path, **kwargs)
class MockTextReceiveStream:
"""Mock TextReceiveStream for testing."""
@ -59,7 +50,9 @@ class TestSubprocessBuffering:
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
@ -92,7 +85,9 @@ class TestSubprocessBuffering:
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
@ -120,7 +115,9 @@ class TestSubprocessBuffering:
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
@ -164,7 +161,9 @@ class TestSubprocessBuffering:
part2 = complete_json[100:250]
part3 = complete_json[250:]
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
@ -210,7 +209,9 @@ class TestSubprocessBuffering:
for i in range(0, len(complete_json), chunk_size)
]
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
@ -238,7 +239,9 @@ class TestSubprocessBuffering:
async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_DEFAULT_MAX_BUFFER_SIZE + 1000)
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
@ -266,7 +269,8 @@ class TestSubprocessBuffering:
transport = SubprocessCLITransport(
prompt="test",
options=make_options(max_buffer_size=custom_limit),
options=ClaudeAgentOptions(max_buffer_size=custom_limit),
cli_path="/usr/bin/claude",
)
mock_process = MagicMock()
@ -305,7 +309,9 @@ class TestSubprocessBuffering:
large_json[3000:] + "\n" + msg3,
]
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None

View file

@ -1,14 +1,10 @@
"""Tests for tool permission callbacks and hook callbacks."""
import json
import pytest
from claude_agent_sdk import (
ClaudeAgentOptions,
HookContext,
HookInput,
HookJSONOutput,
HookMatcher,
PermissionResultAllow,
PermissionResultDeny,
@ -217,7 +213,7 @@ class TestHookCallbacks:
hook_calls = []
async def test_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
input_data: dict, tool_use_id: str | None, context: HookContext
) -> dict:
hook_calls.append({"input": input_data, "tool_use_id": tool_use_id})
return {"processed": True}
@ -261,201 +257,6 @@ class TestHookCallbacks:
last_response = transport.written_messages[-1]
assert '"processed": true' in last_response
@pytest.mark.asyncio
async def test_hook_output_fields(self):
"""Test that all SyncHookJSONOutput fields are properly handled."""
# Test all SyncHookJSONOutput fields together
async def comprehensive_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
return {
# Control fields
"continue_": True,
"suppressOutput": False,
"stopReason": "Test stop reason",
# Decision fields
"decision": "block",
"systemMessage": "Test system message",
"reason": "Test reason for blocking",
# Hook-specific output with all PreToolUse fields
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Security policy violation",
"updatedInput": {"modified": "input"},
},
}
transport = MockTransport()
hooks = {
"PreToolUse": [
{"matcher": {"tool": "TestTool"}, "hooks": [comprehensive_hook]}
]
}
query = Query(
transport=transport, is_streaming_mode=True, can_use_tool=None, hooks=hooks
)
callback_id = "test_comprehensive_hook"
query.hook_callbacks[callback_id] = comprehensive_hook
request = {
"type": "control_request",
"request_id": "test-comprehensive",
"request": {
"subtype": "hook_callback",
"callback_id": callback_id,
"input": {"test": "data"},
"tool_use_id": "tool-456",
},
}
await query._handle_control_request(request)
# Check response contains all the fields
assert len(transport.written_messages) > 0
last_response = transport.written_messages[-1]
# Parse the JSON response
response_data = json.loads(last_response)
# The hook result is nested at response.response
result = response_data["response"]["response"]
# Verify control fields are present and converted to CLI format
assert result.get("continue") is True, (
"continue_ should be converted to continue"
)
assert "continue_" not in result, "continue_ should not appear in CLI output"
assert result.get("suppressOutput") is False
assert result.get("stopReason") == "Test stop reason"
# Verify decision fields are present
assert result.get("decision") == "block"
assert result.get("reason") == "Test reason for blocking"
assert result.get("systemMessage") == "Test system message"
# Verify hook-specific output is present
hook_output = result.get("hookSpecificOutput", {})
assert hook_output.get("hookEventName") == "PreToolUse"
assert hook_output.get("permissionDecision") == "deny"
assert (
hook_output.get("permissionDecisionReason") == "Security policy violation"
)
assert "updatedInput" in hook_output
@pytest.mark.asyncio
async def test_async_hook_output(self):
"""Test AsyncHookJSONOutput type with proper async fields."""
async def async_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
# Test that async hooks properly use async_ and asyncTimeout fields
return {
"async_": True,
"asyncTimeout": 5000,
}
transport = MockTransport()
hooks = {"PreToolUse": [{"matcher": None, "hooks": [async_hook]}]}
query = Query(
transport=transport, is_streaming_mode=True, can_use_tool=None, hooks=hooks
)
callback_id = "test_async_hook"
query.hook_callbacks[callback_id] = async_hook
request = {
"type": "control_request",
"request_id": "test-async",
"request": {
"subtype": "hook_callback",
"callback_id": callback_id,
"input": {"test": "async_data"},
"tool_use_id": None,
},
}
await query._handle_control_request(request)
# Check response contains async fields
assert len(transport.written_messages) > 0
last_response = transport.written_messages[-1]
# Parse the JSON response
response_data = json.loads(last_response)
# The hook result is nested at response.response
result = response_data["response"]["response"]
# The SDK should convert async_ to "async" for CLI compatibility
assert result.get("async") is True, "async_ should be converted to async"
assert "async_" not in result, "async_ should not appear in CLI output"
assert result.get("asyncTimeout") == 5000
@pytest.mark.asyncio
async def test_field_name_conversion(self):
"""Test that Python-safe field names (async_, continue_) are converted to CLI format (async, continue)."""
async def conversion_test_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
) -> HookJSONOutput:
# Return both async_ and continue_ to test conversion
return {
"async_": True,
"asyncTimeout": 10000,
"continue_": False,
"stopReason": "Testing field conversion",
"systemMessage": "Fields should be converted",
}
transport = MockTransport()
hooks = {"PreToolUse": [{"matcher": None, "hooks": [conversion_test_hook]}]}
query = Query(
transport=transport, is_streaming_mode=True, can_use_tool=None, hooks=hooks
)
callback_id = "test_conversion"
query.hook_callbacks[callback_id] = conversion_test_hook
request = {
"type": "control_request",
"request_id": "test-conversion",
"request": {
"subtype": "hook_callback",
"callback_id": callback_id,
"input": {"test": "data"},
"tool_use_id": None,
},
}
await query._handle_control_request(request)
# Check response has converted field names
assert len(transport.written_messages) > 0
last_response = transport.written_messages[-1]
response_data = json.loads(last_response)
result = response_data["response"]["response"]
# Verify async_ was converted to async
assert result.get("async") is True, "async_ should be converted to async"
assert "async_" not in result, "async_ should not appear in output"
# Verify continue_ was converted to continue
assert result.get("continue") is False, (
"continue_ should be converted to continue"
)
assert "continue_" not in result, "continue_ should not appear in output"
# Verify other fields are unchanged
assert result.get("asyncTimeout") == 10000
assert result.get("stopReason") == "Testing field conversion"
assert result.get("systemMessage") == "Fields should be converted"
class TestClaudeAgentOptionsIntegration:
"""Test that callbacks work through ClaudeAgentOptions."""
@ -469,7 +270,7 @@ class TestClaudeAgentOptionsIntegration:
return PermissionResultAllow()
async def my_hook(
input_data: HookInput, tool_use_id: str | None, context: HookContext
input_data: dict, tool_use_id: str | None, context: HookContext
) -> dict:
return {}

View file

@ -10,15 +10,6 @@ import pytest
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_agent_sdk.types import ClaudeAgentOptions
DEFAULT_CLI_PATH = "/usr/bin/claude"
def make_options(**kwargs: object) -> ClaudeAgentOptions:
"""Construct options using the standard CLI path unless overridden."""
cli_path = kwargs.pop("cli_path", DEFAULT_CLI_PATH)
return ClaudeAgentOptions(cli_path=cli_path, **kwargs)
class TestSubprocessCLITransport:
"""Test subprocess transport implementation."""
@ -38,7 +29,9 @@ class TestSubprocessCLITransport:
def test_build_command_basic(self):
"""Test building basic CLI command."""
transport = SubprocessCLITransport(prompt="Hello", options=make_options())
transport = SubprocessCLITransport(
prompt="Hello", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
cmd = transport._build_command()
assert cmd[0] == "/usr/bin/claude"
@ -46,29 +39,27 @@ class TestSubprocessCLITransport:
assert "stream-json" in cmd
assert "--print" in cmd
assert "Hello" in cmd
assert "--system-prompt" in cmd
assert cmd[cmd.index("--system-prompt") + 1] == ""
def test_cli_path_accepts_pathlib_path(self):
"""Test that cli_path accepts pathlib.Path objects."""
from pathlib import Path
path = Path("/usr/bin/claude")
transport = SubprocessCLITransport(
prompt="Hello",
options=ClaudeAgentOptions(cli_path=path),
options=ClaudeAgentOptions(),
cli_path=Path("/usr/bin/claude"),
)
# Path object is converted to string, compare with str(path)
assert transport._cli_path == str(path)
assert transport._cli_path == "/usr/bin/claude"
def test_build_command_with_system_prompt_string(self):
"""Test building CLI command with system prompt as string."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
options=ClaudeAgentOptions(
system_prompt="Be helpful",
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -79,9 +70,10 @@ class TestSubprocessCLITransport:
"""Test building CLI command with system prompt preset."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
options=ClaudeAgentOptions(
system_prompt={"type": "preset", "preset": "claude_code"},
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -92,13 +84,14 @@ class TestSubprocessCLITransport:
"""Test building CLI command with system prompt preset and append."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
options=ClaudeAgentOptions(
system_prompt={
"type": "preset",
"preset": "claude_code",
"append": "Be concise.",
},
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -110,13 +103,14 @@ class TestSubprocessCLITransport:
"""Test building CLI command with options."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
options=ClaudeAgentOptions(
allowed_tools=["Read", "Write"],
disallowed_tools=["Bash"],
model="claude-sonnet-4-5",
permission_mode="acceptEdits",
max_turns=5,
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -131,61 +125,32 @@ class TestSubprocessCLITransport:
assert "--max-turns" in cmd
assert "5" in cmd
def test_build_command_with_fallback_model(self):
"""Test building CLI command with fallback_model option."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
model="opus",
fallback_model="sonnet",
),
)
cmd = transport._build_command()
assert "--model" in cmd
assert "opus" in cmd
assert "--fallback-model" in cmd
assert "sonnet" in cmd
def test_build_command_with_max_thinking_tokens(self):
"""Test building CLI command with max_thinking_tokens option."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(max_thinking_tokens=5000),
)
cmd = transport._build_command()
assert "--max-thinking-tokens" in cmd
assert "5000" in cmd
def test_build_command_with_add_dirs(self):
"""Test building CLI command with add_dirs option."""
from pathlib import Path
dir1 = "/path/to/dir1"
dir2 = Path("/path/to/dir2")
transport = SubprocessCLITransport(
prompt="test",
options=make_options(add_dirs=[dir1, dir2]),
options=ClaudeAgentOptions(
add_dirs=["/path/to/dir1", Path("/path/to/dir2")]
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
cmd_str = " ".join(cmd)
# Check that both directories are in the command
assert "--add-dir" in cmd
add_dir_indices = [i for i, x in enumerate(cmd) if x == "--add-dir"]
assert len(add_dir_indices) == 2
# The directories should appear after --add-dir flags
dirs_in_cmd = [cmd[i + 1] for i in add_dir_indices]
assert dir1 in dirs_in_cmd
assert str(dir2) in dirs_in_cmd
# Check that the command string contains the expected --add-dir flags
assert "--add-dir /path/to/dir1 --add-dir /path/to/dir2" in cmd_str
def test_session_continuation(self):
"""Test session continuation options."""
transport = SubprocessCLITransport(
prompt="Continue from before",
options=make_options(continue_conversation=True, resume="session-123"),
options=ClaudeAgentOptions(
continue_conversation=True, resume="session-123"
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -225,7 +190,8 @@ class TestSubprocessCLITransport:
transport = SubprocessCLITransport(
prompt="test",
options=make_options(),
options=ClaudeAgentOptions(),
cli_path="/usr/bin/claude",
)
await transport.connect()
@ -241,7 +207,9 @@ class TestSubprocessCLITransport:
"""Test reading messages from CLI output."""
# This test is simplified to just test the transport creation
# The full async stream handling is tested in integration tests
transport = SubprocessCLITransport(prompt="test", options=make_options())
transport = SubprocessCLITransport(
prompt="test", options=ClaudeAgentOptions(), cli_path="/usr/bin/claude"
)
# The transport now just provides raw message reading via read_messages()
# So we just verify the transport can be created and basic structure is correct
@ -255,7 +223,8 @@ class TestSubprocessCLITransport:
async def _test():
transport = SubprocessCLITransport(
prompt="test",
options=make_options(cwd="/this/directory/does/not/exist"),
options=ClaudeAgentOptions(cwd="/this/directory/does/not/exist"),
cli_path="/usr/bin/claude",
)
with pytest.raises(CLIConnectionError) as exc_info:
@ -269,7 +238,8 @@ class TestSubprocessCLITransport:
"""Test building CLI command with settings as file path."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(settings="/path/to/settings.json"),
options=ClaudeAgentOptions(settings="/path/to/settings.json"),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -281,7 +251,8 @@ class TestSubprocessCLITransport:
settings_json = '{"permissions": {"allow": ["Bash(ls:*)"]}}'
transport = SubprocessCLITransport(
prompt="test",
options=make_options(settings=settings_json),
options=ClaudeAgentOptions(settings=settings_json),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -292,13 +263,14 @@ class TestSubprocessCLITransport:
"""Test building CLI command with extra_args for future flags."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
options=ClaudeAgentOptions(
extra_args={
"new-flag": "value",
"boolean-flag": None,
"another-option": "test-value",
}
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -329,7 +301,8 @@ class TestSubprocessCLITransport:
transport = SubprocessCLITransport(
prompt="test",
options=make_options(mcp_servers=mcp_servers),
options=ClaudeAgentOptions(mcp_servers=mcp_servers),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -349,36 +322,36 @@ class TestSubprocessCLITransport:
from pathlib import Path
# Test with string path
string_path = "/path/to/mcp-config.json"
transport = SubprocessCLITransport(
prompt="test",
options=make_options(mcp_servers=string_path),
options=ClaudeAgentOptions(mcp_servers="/path/to/mcp-config.json"),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
assert cmd[mcp_idx + 1] == string_path
assert cmd[mcp_idx + 1] == "/path/to/mcp-config.json"
# Test with Path object
path_obj = Path("/path/to/mcp-config.json")
transport = SubprocessCLITransport(
prompt="test",
options=make_options(mcp_servers=path_obj),
options=ClaudeAgentOptions(mcp_servers=Path("/path/to/mcp-config.json")),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
# Path object gets converted to string, compare with str(path_obj)
assert cmd[mcp_idx + 1] == str(path_obj)
assert cmd[mcp_idx + 1] == "/path/to/mcp-config.json"
def test_build_command_with_mcp_servers_as_json_string(self):
"""Test building CLI command with mcp_servers as JSON string."""
json_config = '{"mcpServers": {"server": {"type": "stdio", "command": "test"}}}'
transport = SubprocessCLITransport(
prompt="test",
options=make_options(mcp_servers=json_config),
options=ClaudeAgentOptions(mcp_servers=json_config),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
@ -395,7 +368,7 @@ class TestSubprocessCLITransport:
"MY_TEST_VAR": test_value,
}
options = make_options(env=custom_env)
options = ClaudeAgentOptions(env=custom_env)
# Mock the subprocess to capture the env argument
with patch(
@ -424,6 +397,7 @@ class TestSubprocessCLITransport:
transport = SubprocessCLITransport(
prompt="test",
options=options,
cli_path="/usr/bin/claude",
)
await transport.connect()
@ -455,7 +429,7 @@ class TestSubprocessCLITransport:
async def _test():
custom_user = "claude"
options = make_options(user=custom_user)
options = ClaudeAgentOptions(user=custom_user)
# Mock the subprocess to capture the env argument
with patch(
@ -484,6 +458,7 @@ class TestSubprocessCLITransport:
transport = SubprocessCLITransport(
prompt="test",
options=options,
cli_path="/usr/bin/claude",
)
await transport.connect()
@ -500,329 +475,3 @@ class TestSubprocessCLITransport:
assert user_passed == "claude"
anyio.run(_test)
def test_build_command_with_sandbox_only(self):
"""Test building CLI command with sandbox settings (no existing settings)."""
import json
from claude_agent_sdk import SandboxSettings
sandbox: SandboxSettings = {
"enabled": True,
"autoAllowBashIfSandboxed": True,
"network": {
"allowLocalBinding": True,
"allowUnixSockets": ["/var/run/docker.sock"],
},
}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(sandbox=sandbox),
)
cmd = transport._build_command()
# Should have --settings with sandbox merged in
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
# Parse and verify
parsed = json.loads(settings_value)
assert "sandbox" in parsed
assert parsed["sandbox"]["enabled"] is True
assert parsed["sandbox"]["autoAllowBashIfSandboxed"] is True
assert parsed["sandbox"]["network"]["allowLocalBinding"] is True
assert parsed["sandbox"]["network"]["allowUnixSockets"] == [
"/var/run/docker.sock"
]
def test_build_command_with_sandbox_and_settings_json(self):
"""Test building CLI command with sandbox merged into existing settings JSON."""
import json
from claude_agent_sdk import SandboxSettings
# Existing settings as JSON string
existing_settings = (
'{"permissions": {"allow": ["Bash(ls:*)"]}, "verbose": true}'
)
sandbox: SandboxSettings = {
"enabled": True,
"excludedCommands": ["git", "docker"],
}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(settings=existing_settings, sandbox=sandbox),
)
cmd = transport._build_command()
# Should have merged settings
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
parsed = json.loads(settings_value)
# Original settings should be preserved
assert parsed["permissions"] == {"allow": ["Bash(ls:*)"]}
assert parsed["verbose"] is True
# Sandbox should be merged in
assert "sandbox" in parsed
assert parsed["sandbox"]["enabled"] is True
assert parsed["sandbox"]["excludedCommands"] == ["git", "docker"]
def test_build_command_with_settings_file_and_no_sandbox(self):
"""Test that settings file path is passed through when no sandbox."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(settings="/path/to/settings.json"),
)
cmd = transport._build_command()
# Should pass path directly, not parse it
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
assert cmd[settings_idx + 1] == "/path/to/settings.json"
def test_build_command_sandbox_minimal(self):
"""Test sandbox with minimal configuration."""
import json
from claude_agent_sdk import SandboxSettings
sandbox: SandboxSettings = {"enabled": True}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(sandbox=sandbox),
)
cmd = transport._build_command()
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
parsed = json.loads(settings_value)
assert parsed == {"sandbox": {"enabled": True}}
def test_sandbox_network_config(self):
"""Test sandbox with full network configuration."""
import json
from claude_agent_sdk import SandboxSettings
sandbox: SandboxSettings = {
"enabled": True,
"network": {
"allowUnixSockets": ["/tmp/ssh-agent.sock"],
"allowAllUnixSockets": False,
"allowLocalBinding": True,
"httpProxyPort": 8080,
"socksProxyPort": 8081,
},
}
transport = SubprocessCLITransport(
prompt="test",
options=make_options(sandbox=sandbox),
)
cmd = transport._build_command()
settings_idx = cmd.index("--settings")
settings_value = cmd[settings_idx + 1]
parsed = json.loads(settings_value)
network = parsed["sandbox"]["network"]
assert network["allowUnixSockets"] == ["/tmp/ssh-agent.sock"]
assert network["allowAllUnixSockets"] is False
assert network["allowLocalBinding"] is True
assert network["httpProxyPort"] == 8080
assert network["socksProxyPort"] == 8081
def test_build_command_with_tools_array(self):
"""Test building CLI command with tools as array of tool names."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(tools=["Read", "Edit", "Bash"]),
)
cmd = transport._build_command()
assert "--tools" in cmd
tools_idx = cmd.index("--tools")
assert cmd[tools_idx + 1] == "Read,Edit,Bash"
def test_build_command_with_tools_empty_array(self):
"""Test building CLI command with tools as empty array (disables all tools)."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(tools=[]),
)
cmd = transport._build_command()
assert "--tools" in cmd
tools_idx = cmd.index("--tools")
assert cmd[tools_idx + 1] == ""
def test_build_command_with_tools_preset(self):
"""Test building CLI command with tools preset."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(tools={"type": "preset", "preset": "claude_code"}),
)
cmd = transport._build_command()
assert "--tools" in cmd
tools_idx = cmd.index("--tools")
assert cmd[tools_idx + 1] == "default"
def test_build_command_without_tools(self):
"""Test building CLI command without tools option (default None)."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(),
)
cmd = transport._build_command()
assert "--tools" not in cmd
def test_concurrent_writes_are_serialized(self):
"""Test that concurrent write() calls are serialized by the lock.
When parallel subagents invoke MCP tools, they trigger concurrent write()
calls. Without the _write_lock, trio raises BusyResourceError.
Uses a real subprocess with the same stream setup as production:
process.stdin -> TextSendStream
"""
async def _test():
import sys
from subprocess import PIPE
from anyio.streams.text import TextSendStream
# Create a real subprocess that consumes stdin (cross-platform)
process = await anyio.open_process(
[sys.executable, "-c", "import sys; sys.stdin.read()"],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
)
try:
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(cli_path="/usr/bin/claude"),
)
# Same setup as production: TextSendStream wrapping process.stdin
transport._ready = True
transport._process = MagicMock(returncode=None)
transport._stdin_stream = TextSendStream(process.stdin)
# Spawn concurrent writes - the lock should serialize them
num_writes = 10
errors: list[Exception] = []
async def do_write(i: int):
try:
await transport.write(f'{{"msg": {i}}}\n')
except Exception as e:
errors.append(e)
async with anyio.create_task_group() as tg:
for i in range(num_writes):
tg.start_soon(do_write, i)
# All writes should succeed - the lock serializes them
assert len(errors) == 0, f"Got errors: {errors}"
finally:
process.terminate()
await process.wait()
anyio.run(_test, backend="trio")
def test_concurrent_writes_fail_without_lock(self):
"""Verify that without the lock, concurrent writes cause BusyResourceError.
Uses a real subprocess with the same stream setup as production.
"""
async def _test():
import sys
from contextlib import asynccontextmanager
from subprocess import PIPE
from anyio.streams.text import TextSendStream
# Create a real subprocess that consumes stdin (cross-platform)
process = await anyio.open_process(
[sys.executable, "-c", "import sys; sys.stdin.read()"],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
)
try:
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeAgentOptions(cli_path="/usr/bin/claude"),
)
# Same setup as production
transport._ready = True
transport._process = MagicMock(returncode=None)
transport._stdin_stream = TextSendStream(process.stdin)
# Replace lock with no-op to trigger the race condition
class NoOpLock:
@asynccontextmanager
async def __call__(self):
yield
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
transport._write_lock = NoOpLock()
# Spawn concurrent writes - should fail without lock
num_writes = 10
errors: list[Exception] = []
async def do_write(i: int):
try:
await transport.write(f'{{"msg": {i}}}\n')
except Exception as e:
errors.append(e)
async with anyio.create_task_group() as tg:
for i in range(num_writes):
tg.start_soon(do_write, i)
# Should have gotten errors due to concurrent access
assert len(errors) > 0, (
"Expected errors from concurrent access, but got none"
)
# Check that at least one error mentions the concurrent access
error_strs = [str(e) for e in errors]
assert any("another task" in s for s in error_strs), (
f"Expected 'another task' error, got: {error_strs}"
)
finally:
process.terminate()
await process.wait()
anyio.run(_test, backend="trio")