## Summary
This PR implements control protocol support in the Python SDK, aligning
it with the TypeScript implementation pattern. The refactor introduces a
Query + Transport separation to enable bidirectional communication
between the SDK and CLI.
## Motivation
The previous Python SDK implementation used a high-level abstraction in
the Transport ABC (`send_request`/`receive_messages`) that couldn't
handle bidirectional communication. This prevented support for:
- Control messages from CLI to SDK that need responses
- Hooks implementation
- Dynamic permission mode changes
- SDK MCP servers
## Changes
### Core Architecture Refactor
1. **New Query Class** (`src/claude_code_sdk/_internal/query.py`)
- Manages control protocol on top of Transport
- Handles control request/response routing
- Manages initialization handshake with timeout
- Supports hook callbacks and tool permission callbacks
- Implements message streaming
2. **Refactored Transport ABC**
(`src/claude_code_sdk/_internal/transport/__init__.py`)
- Changed from high-level (`send_request`/`receive_messages`) to
low-level (`write`/`read_messages`) interface
- Now handles raw I/O instead of protocol logic
- Aligns with TypeScript ProcessTransport pattern
3. **Updated SubprocessCLITransport**
(`src/claude_code_sdk/_internal/transport/subprocess_cli.py`)
- Simplified to focus on raw message streaming
- Removed protocol logic (moved to Query)
- Improved cleanup and error handling
4. **Enhanced ClaudeSDKClient** (`src/claude_code_sdk/client.py`)
- Now uses Query for control protocol
- Supports initialization messages
- Better error handling for control protocol failures
### Control Protocol Features
- **Initialization handshake**: SDK sends initialize request, CLI
responds with supported commands
- **Control message types**:
- `initialize`: Establish bidirectional connection
- `interrupt`: Cancel ongoing operations
- `set_permission_mode`: Change permission mode dynamically
- **Timeout handling**: 60-second timeout for initialization to handle
CLI versions without control support
### Examples
Updated `examples/streaming_mode.py` to demonstrate control protocol
initialization and error handling.
## Testing
- Tested with current CLI (no control protocol support yet) - gracefully
falls back
- Verified backward compatibility with existing `query()` function
- Tested initialization timeout handling
- Verified proper cleanup on errors
## Design Alignment
This implementation closely follows the TypeScript reference:
- `src/core/Query.ts` → `src/claude_code_sdk/_internal/query.py`
- `src/transport/ProcessTransport.ts` →
`src/claude_code_sdk/_internal/transport/subprocess_cli.py`
- `src/entrypoints/sdk.ts` → `src/claude_code_sdk/client.py`
## Next Steps
Once the CLI implements the control protocol handler, this will enable:
- Hooks support
- Dynamic permission mode changes
- SDK MCP servers
- Improved error recovery
🤖 Generated with [Claude Code](https://claude.ai/code)
---------
Co-authored-by: Ashwin Bhat <ashwin@anthropic.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Kashyap Murali <kashyap@anthropic.com>
Remove anyio.create_task_group() from receive_messages() to fix the
RuntimeError "Attempted to exit cancel scope in a different task than
it was entered in" when using the SDK with FastAPI's SSE streaming.
The issue occurred because FastAPI can move async generators between
different asyncio tasks during the streaming lifecycle, which conflicts
with anyio's cancel scope tracking.
Changes:
- Remove task group usage from receive_messages()
- Read stderr sequentially after stdout completes
- Add test to ensure no task groups are used
- Fix existing test expectation for buffer overflow
This is a minimal fix that maintains compatibility while solving the
core issue. The trade-off is that stderr is read after stdout instead
of concurrently, but this is unlikely to cause issues in practice.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Removed redundant comments that simply restate what the code is doing.
Kept only comments that provide valuable context or explain complex behavior.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Adds buffering to accumulate incomplete JSON messages that are split
across multiple stream reads due to asyncio's 64KB default buffer limit.
- Implement 1MB buffer to accumulate partial JSON
- Clear buffer on successful parse or size limit exceeded
- Add comprehensive tests for split JSON scenarios
Fixes CLIJSONDecodeError when reading large minified JSON files.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Fixed quotes to use double quotes consistently
- Adjusted line breaks per ruff formatter rules
- No functional changes
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Fixed formatting in test_subprocess_buffering.py
- No functional changes, only code style improvements
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Convert async test to use anyio.run() pattern consistent with other tests
- Remove unused CLIJSONDecodeError import
- Add tests for JSON with embedded newlines
- Add tests for multiple newlines between objects
- All tests passing
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>