fix: Convert camelCase to snake_case for Python naming conventions (#146)
Some checks are pending
Lint / lint (push) Waiting to run
Test / test (3.10) (push) Waiting to run
Test / test (3.11) (push) Waiting to run
Test / test (3.12) (push) Waiting to run
Test / test (3.13) (push) Waiting to run

- Renamed PermissionRuleValue fields: toolName → tool_name, ruleContent
→ rule_content
- Renamed PermissionResultAllow fields: updatedInput → updated_input,
updatedPermissions → updated_permissions
- Removed unused PermissionResult import from query.py
- Fixed trailing whitespace issues in types.py
- Updated all usages in examples and tests to use snake_case

These changes ensure compliance with Python's PEP 8 naming conventions
and fix linting errors.

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Ashwin Bhat 2025-09-04 19:26:00 -07:00 committed by GitHub
parent 68f0d7aa7d
commit 681f46c873
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 133 additions and 130 deletions

View file

@ -61,7 +61,7 @@ async def my_permission_callback(
modified_input = input_data.copy()
modified_input["file_path"] = safe_path
return PermissionResultAllow(
updatedInput=modified_input
updated_input=modified_input
)
# Check dangerous bash commands

View file

@ -57,7 +57,7 @@ class SdkMcpTool(Generic[T]):
def tool(
name: str, description: str, input_schema: type | dict[str, Any]
) -> Callable[[Callable[[Any], Awaitable[dict[str, Any]]]], SdkMcpTool]:
) -> Callable[[Callable[[Any], Awaitable[dict[str, Any]]]], SdkMcpTool[Any]]:
"""Decorator for defining MCP tools with type safety.
Creates a tool that can be used with SDK MCP servers. The tool runs
@ -105,7 +105,9 @@ def tool(
- Errors can be indicated by including "is_error": True in the response
"""
def decorator(handler: Callable[[Any], Awaitable[dict[str, Any]]]) -> SdkMcpTool:
def decorator(
handler: Callable[[Any], Awaitable[dict[str, Any]]],
) -> SdkMcpTool[Any]:
return SdkMcpTool(
name=name,
description=description,
@ -117,7 +119,7 @@ def tool(
def create_sdk_mcp_server(
name: str, version: str = "1.0.0", tools: list[SdkMcpTool] | None = None
name: str, version: str = "1.0.0", tools: list[SdkMcpTool[Any]] | None = None
) -> McpSdkServerConfig:
"""Create an in-process MCP server that runs within your Python application.
@ -200,7 +202,7 @@ def create_sdk_mcp_server(
tool_map = {tool_def.name: tool_def for tool_def in tools}
# Register list_tools handler to expose available tools
@server.list_tools()
@server.list_tools() # type: ignore[no-untyped-call,misc]
async def list_tools() -> list[Tool]:
"""Return the list of available tools."""
tool_list = []
@ -246,8 +248,8 @@ def create_sdk_mcp_server(
return tool_list
# Register call_tool handler to execute tools
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Any:
@server.call_tool() # type: ignore[misc]
async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"""Execute a tool by name with given arguments."""
if name not in tool_map:
raise ValueError(f"Tool '{name}' not found")

View file

@ -20,17 +20,17 @@ class InternalClient:
"""Initialize the internal client."""
def _convert_hooks_to_internal_format(
self, hooks: dict[str, list]
self, hooks: dict[str, list[Any]]
) -> dict[str, list[dict[str, Any]]]:
"""Convert HookMatcher format to internal Query format."""
internal_hooks = {}
internal_hooks: dict[str, list[dict[str, Any]]] = {}
for event, matchers in hooks.items():
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, 'matcher') else None,
"hooks": matcher.hooks if hasattr(matcher, 'hooks') else []
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -57,7 +57,7 @@ class InternalClient:
if options.mcp_servers and isinstance(options.mcp_servers, dict):
for name, config in options.mcp_servers.items():
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"]
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
# Create Query to handle control protocol
is_streaming = not isinstance(prompt, str)
@ -65,7 +65,9 @@ class InternalClient:
transport=chosen_transport,
is_streaming_mode=is_streaming,
can_use_tool=options.can_use_tool,
hooks=self._convert_hooks_to_internal_format(options.hooks) if options.hooks else None,
hooks=self._convert_hooks_to_internal_format(options.hooks)
if options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
)

View file

@ -15,7 +15,6 @@ from mcp.types import (
)
from ..types import (
PermissionResult,
PermissionResultAllow,
PermissionResultDeny,
SDKControlPermissionRequest,
@ -48,7 +47,8 @@ class Query:
transport: Transport,
is_streaming_mode: bool,
can_use_tool: Callable[
[str, dict[str, Any], dict[str, Any]], Awaitable[dict[str, Any]]
[str, dict[str, Any], ToolPermissionContext],
Awaitable[PermissionResultAllow | PermissionResultDeny],
]
| None = None,
hooks: dict[str, list[dict[str, Any]]] | None = None,
@ -191,7 +191,7 @@ class Query:
subtype = request_data["subtype"]
try:
response_data = {}
response_data: dict[str, Any] = {}
if subtype == "can_use_tool":
permission_request: SDKControlPermissionRequest = request_data # type: ignore[assignment]
@ -202,30 +202,28 @@ class Query:
context = ToolPermissionContext(
signal=None, # TODO: Add abort signal support
suggestions=permission_request.get("permission_suggestions", [])
or [],
)
response = await self.can_use_tool(
permission_request["tool_name"],
permission_request["input"],
context
context,
)
# Convert PermissionResult to expected dict format
if isinstance(response, PermissionResultAllow):
response_data = {
"allow": True
}
if response.updatedInput is not None:
response_data["input"] = response.updatedInput
response_data = {"allow": True}
if response.updated_input is not None:
response_data["input"] = response.updated_input
# TODO: Handle updatedPermissions when control protocol supports it
elif isinstance(response, PermissionResultDeny):
response_data = {
"allow": False,
"reason": response.message
}
response_data = {"allow": False, "reason": response.message}
# TODO: Handle interrupt flag when control protocol supports it
else:
raise TypeError(f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}")
raise TypeError(
f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}"
)
elif subtype == "hook_callback":
hook_callback_request: SDKHookCallbackRequest = request_data # type: ignore[assignment]
@ -241,7 +239,7 @@ class Query:
{"signal": None}, # TODO: Add abort signal support
)
elif subtype == "mcp_request":
elif subtype == "mcp_message":
# Handle SDK MCP request
server_name = request_data.get("server_name")
mcp_message = request_data.get("message")
@ -249,7 +247,12 @@ class Query:
if not server_name or not mcp_message:
raise Exception("Missing server_name or message for MCP request")
response_data = await self._handle_sdk_mcp_request(server_name, mcp_message)
# Type narrowing - we've verified these are not None above
assert isinstance(server_name, str)
assert isinstance(mcp_message, dict)
response_data = await self._handle_sdk_mcp_request(
server_name, mcp_message
)
else:
raise Exception(f"Unsupported control request subtype: {subtype}")
@ -317,7 +320,9 @@ class Query:
self.pending_control_results.pop(request_id, None)
raise Exception(f"Control request timeout: {request.get('subtype')}") from e
async def _handle_sdk_mcp_request(self, server_name: str, message: dict) -> dict:
async def _handle_sdk_mcp_request(
self, server_name: str, message: dict[str, Any]
) -> dict[str, Any]:
"""Handle an MCP request for an SDK server.
This acts as a bridge between JSONRPC messages from the CLI
@ -362,43 +367,50 @@ class Query:
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema.model_dump() if tool.inputSchema else {}
"inputSchema": tool.inputSchema.model_dump() # type: ignore[union-attr]
if tool.inputSchema
else {},
}
for tool in result.root.tools
for tool in result.root.tools # type: ignore[union-attr]
]
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {"tools": tools_data}
"result": {"tools": tools_data},
}
elif method == "tools/call":
request = CallToolRequest(
call_request = CallToolRequest(
method=method,
params=CallToolRequestParams(
name=params.get("name"),
arguments=params.get("arguments", {})
)
name=params.get("name"), arguments=params.get("arguments", {})
),
)
handler = server.request_handlers.get(CallToolRequest)
if handler:
result = await handler(request)
result = await handler(call_request)
# Convert MCP result to JSONRPC response
content = []
for item in result.root.content:
if hasattr(item, 'text'):
for item in result.root.content: # type: ignore[union-attr]
if hasattr(item, "text"):
content.append({"type": "text", "text": item.text})
elif hasattr(item, 'data') and hasattr(item, 'mimeType'):
content.append({"type": "image", "data": item.data, "mimeType": item.mimeType})
elif hasattr(item, "data") and hasattr(item, "mimeType"):
content.append(
{
"type": "image",
"data": item.data,
"mimeType": item.mimeType,
}
)
response_data = {"content": content}
if hasattr(result.root, 'is_error') and result.root.is_error:
response_data["is_error"] = True
if hasattr(result.root, "is_error") and result.root.is_error:
response_data["is_error"] = True # type: ignore[assignment]
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": response_data
"result": response_data,
}
# Add more methods here as MCP SDK adds them (resources, prompts, etc.)

View file

@ -101,17 +101,17 @@ class ClaudeSDKClient:
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py-client"
def _convert_hooks_to_internal_format(
self, hooks: dict[str, list]
self, hooks: dict[str, list[Any]]
) -> dict[str, list[dict[str, Any]]]:
"""Convert HookMatcher format to internal Query format."""
internal_hooks = {}
internal_hooks: dict[str, list[dict[str, Any]]] = {}
for event, matchers in hooks.items():
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, 'matcher') else None,
"hooks": matcher.hooks if hasattr(matcher, 'hooks') else []
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -145,14 +145,16 @@ class ClaudeSDKClient:
if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict):
for name, config in self.options.mcp_servers.items():
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"]
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
# Create Query to handle control protocol
self._query = Query(
transport=self._transport,
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
can_use_tool=self.options.can_use_tool,
hooks=self._convert_hooks_to_internal_format(self.options.hooks) if self.options.hooks else None,
hooks=self._convert_hooks_to_internal_format(self.options.hooks)
if self.options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
)

View file

@ -5,10 +5,7 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, TypedDict
try:
from typing import NotRequired # Python 3.11+
except ImportError:
from typing_extensions import NotRequired # For Python < 3.11 compatibility
from typing_extensions import NotRequired
if TYPE_CHECKING:
from mcp.server import Server as McpServer
@ -19,59 +16,73 @@ PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"]
# Permission Update types (matching TypeScript SDK)
PermissionUpdateDestination = Literal[
"userSettings",
"projectSettings",
"localSettings",
"session"
"userSettings", "projectSettings", "localSettings", "session"
]
PermissionBehavior = Literal["allow", "deny", "ask"]
@dataclass
class PermissionRuleValue:
"""Permission rule value."""
toolName: str
ruleContent: str | None = None
@dataclass
tool_name: str
rule_content: str | None = None
@dataclass
class PermissionUpdate:
"""Permission update configuration."""
type: Literal["addRules", "replaceRules", "removeRules", "setMode", "addDirectories", "removeDirectories"]
type: Literal[
"addRules",
"replaceRules",
"removeRules",
"setMode",
"addDirectories",
"removeDirectories",
]
rules: list[PermissionRuleValue] | None = None
behavior: PermissionBehavior | None = None
mode: PermissionMode | None = None
directories: list[str] | None = None
destination: PermissionUpdateDestination | None = None
# Tool callback types
@dataclass
class ToolPermissionContext:
"""Context information for tool permission callbacks."""
signal: Any | None = None # Future: abort signal support
suggestions: list[PermissionUpdate] = field(default_factory=list) # Permission suggestions from CLI
suggestions: list[PermissionUpdate] = field(
default_factory=list
) # Permission suggestions from CLI
# Match TypeScript's PermissionResult structure
@dataclass
class PermissionResultAllow:
"""Allow permission result."""
behavior: Literal["allow"] = "allow"
updatedInput: dict[str, Any] | None = None
updatedPermissions: list[PermissionUpdate] | None = None
updated_input: dict[str, Any] | None = None
updated_permissions: list[PermissionUpdate] | None = None
@dataclass
class PermissionResultDeny:
"""Deny permission result."""
behavior: Literal["deny"] = "deny"
behavior: Literal["deny"] = "deny"
message: str = ""
interrupt: bool = False
PermissionResult = PermissionResultAllow | PermissionResultDeny
CanUseTool = Callable[
[str, dict[str, Any], ToolPermissionContext],
Awaitable[PermissionResult]
[str, dict[str, Any], ToolPermissionContext], Awaitable[PermissionResult]
]
@ -85,7 +96,7 @@ class HookContext:
HookCallback = Callable[
[dict[str, Any], str | None, HookContext], # input, tool_use_id, context
Awaitable[dict[str, Any]] # response data
Awaitable[dict[str, Any]], # response data
]
@ -260,6 +271,7 @@ class SDKControlPermissionRequest(TypedDict):
permission_suggestions: list[Any] | None
blocked_path: str | None
class SDKControlInitializeRequest(TypedDict):
subtype: Literal["initialize"]
# TODO: Use HookEvent names as the key.

View file

@ -38,6 +38,7 @@ class MockTransport(Transport):
async def _read():
for msg in self.messages_to_read:
yield msg
return _read()
def is_ready(self) -> bool:
@ -53,9 +54,7 @@ class TestToolPermissionCallbacks:
callback_invoked = False
async def allow_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
nonlocal callback_invoked
callback_invoked = True
@ -68,7 +67,7 @@ class TestToolPermissionCallbacks:
transport=transport,
is_streaming_mode=True,
can_use_tool=allow_callback,
hooks=None
hooks=None,
)
# Simulate control request
@ -79,8 +78,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "TestTool",
"input": {"param": "value"},
"permission_suggestions": []
}
"permission_suggestions": [],
},
}
await query._handle_control_request(request)
@ -96,21 +95,18 @@ class TestToolPermissionCallbacks:
@pytest.mark.asyncio
async def test_permission_callback_deny(self):
"""Test callback that denies tool execution."""
async def deny_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultDeny:
return PermissionResultDeny(
message="Security policy violation"
)
return PermissionResultDeny(message="Security policy violation")
transport = MockTransport()
query = Query(
transport=transport,
is_streaming_mode=True,
can_use_tool=deny_callback,
hooks=None
hooks=None,
)
request = {
@ -120,8 +116,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "DangerousTool",
"input": {"command": "rm -rf /"},
"permission_suggestions": ["deny"]
}
"permission_suggestions": ["deny"],
},
}
await query._handle_control_request(request)
@ -135,24 +131,21 @@ class TestToolPermissionCallbacks:
@pytest.mark.asyncio
async def test_permission_callback_input_modification(self):
"""Test callback that modifies tool input."""
async def modify_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
# Modify the input to add safety flag
modified_input = input_data.copy()
modified_input["safe_mode"] = True
return PermissionResultAllow(
updatedInput=modified_input
)
return PermissionResultAllow(updated_input=modified_input)
transport = MockTransport()
query = Query(
transport=transport,
is_streaming_mode=True,
can_use_tool=modify_callback,
hooks=None
hooks=None,
)
request = {
@ -162,8 +155,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "WriteTool",
"input": {"file_path": "/etc/passwd"},
"permission_suggestions": []
}
"permission_suggestions": [],
},
}
await query._handle_control_request(request)
@ -177,10 +170,9 @@ class TestToolPermissionCallbacks:
@pytest.mark.asyncio
async def test_callback_exception_handling(self):
"""Test that callback exceptions are properly handled."""
async def error_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
raise ValueError("Callback error")
@ -189,7 +181,7 @@ class TestToolPermissionCallbacks:
transport=transport,
is_streaming_mode=True,
can_use_tool=error_callback,
hooks=None
hooks=None,
)
request = {
@ -199,8 +191,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "TestTool",
"input": {},
"permission_suggestions": []
}
"permission_suggestions": [],
},
}
await query._handle_control_request(request)
@ -221,33 +213,20 @@ class TestHookCallbacks:
hook_calls = []
async def test_hook(
input_data: dict,
tool_use_id: str | None,
context: HookContext
input_data: dict, tool_use_id: str | None, context: HookContext
) -> dict:
hook_calls.append({
"input": input_data,
"tool_use_id": tool_use_id
})
hook_calls.append({"input": input_data, "tool_use_id": tool_use_id})
return {"processed": True}
transport = MockTransport()
# Create hooks configuration
hooks = {
"tool_use_start": [
{
"matcher": {"tool": "TestTool"},
"hooks": [test_hook]
}
]
"tool_use_start": [{"matcher": {"tool": "TestTool"}, "hooks": [test_hook]}]
}
query = Query(
transport=transport,
is_streaming_mode=True,
can_use_tool=None,
hooks=hooks
transport=transport, is_streaming_mode=True, can_use_tool=None, hooks=hooks
)
# Manually register the hook callback to avoid needing the full initialize flow
@ -262,8 +241,8 @@ class TestHookCallbacks:
"subtype": "hook_callback",
"callback_id": callback_id,
"input": {"test": "data"},
"tool_use_id": "tool-123"
}
"tool_use_id": "tool-123",
},
}
await query._handle_control_request(request)
@ -284,17 +263,14 @@ class TestClaudeCodeOptionsIntegration:
def test_options_with_callbacks(self):
"""Test creating options with callbacks."""
async def my_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
return PermissionResultAllow()
async def my_hook(
input_data: dict,
tool_use_id: str | None,
context: HookContext
input_data: dict, tool_use_id: str | None, context: HookContext
) -> dict:
return {}
@ -302,12 +278,9 @@ class TestClaudeCodeOptionsIntegration:
can_use_tool=my_callback,
hooks={
"tool_use_start": [
HookMatcher(
matcher={"tool": "Bash"},
hooks=[my_hook]
)
HookMatcher(matcher={"tool": "Bash"}, hooks=[my_hook])
]
}
},
)
assert options.can_use_tool == my_callback