fix: Convert camelCase to snake_case for Python naming conventions

- Renamed PermissionRuleValue fields: toolName → tool_name, ruleContent → rule_content
- Renamed PermissionResultAllow fields: updatedInput → updated_input, updatedPermissions → updated_permissions
- Removed unused PermissionResult import from query.py
- Fixed trailing whitespace issues in types.py
- Updated all usages in examples and tests to use snake_case

These changes ensure compliance with Python's PEP 8 naming conventions and fix linting errors.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ashwin Bhat 2025-09-04 12:37:57 -07:00
parent 68f0d7aa7d
commit 7b0938a1cc
No known key found for this signature in database
6 changed files with 103 additions and 106 deletions

View file

@ -61,7 +61,7 @@ async def my_permission_callback(
modified_input = input_data.copy()
modified_input["file_path"] = safe_path
return PermissionResultAllow(
updatedInput=modified_input
updated_input=modified_input
)
# Check dangerous bash commands

View file

@ -29,8 +29,8 @@ class InternalClient:
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, 'matcher') else None,
"hooks": matcher.hooks if hasattr(matcher, 'hooks') else []
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -65,7 +65,9 @@ class InternalClient:
transport=chosen_transport,
is_streaming_mode=is_streaming,
can_use_tool=options.can_use_tool,
hooks=self._convert_hooks_to_internal_format(options.hooks) if options.hooks else None,
hooks=self._convert_hooks_to_internal_format(options.hooks)
if options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
)

View file

@ -15,7 +15,6 @@ from mcp.types import (
)
from ..types import (
PermissionResult,
PermissionResultAllow,
PermissionResultDeny,
SDKControlPermissionRequest,
@ -201,31 +200,28 @@ class Query:
context = ToolPermissionContext(
signal=None, # TODO: Add abort signal support
suggestions=permission_request.get("permission_suggestions", [])
suggestions=permission_request.get("permission_suggestions", []),
)
response = await self.can_use_tool(
permission_request["tool_name"],
permission_request["input"],
context
context,
)
# Convert PermissionResult to expected dict format
if isinstance(response, PermissionResultAllow):
response_data = {
"allow": True
}
if response.updatedInput is not None:
response_data["input"] = response.updatedInput
response_data = {"allow": True}
if response.updated_input is not None:
response_data["input"] = response.updated_input
# TODO: Handle updatedPermissions when control protocol supports it
elif isinstance(response, PermissionResultDeny):
response_data = {
"allow": False,
"reason": response.message
}
response_data = {"allow": False, "reason": response.message}
# TODO: Handle interrupt flag when control protocol supports it
else:
raise TypeError(f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}")
raise TypeError(
f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}"
)
elif subtype == "hook_callback":
hook_callback_request: SDKHookCallbackRequest = request_data # type: ignore[assignment]
@ -249,7 +245,9 @@ class Query:
if not server_name or not mcp_message:
raise Exception("Missing server_name or message for MCP request")
response_data = await self._handle_sdk_mcp_request(server_name, mcp_message)
response_data = await self._handle_sdk_mcp_request(
server_name, mcp_message
)
else:
raise Exception(f"Unsupported control request subtype: {subtype}")
@ -362,23 +360,24 @@ class Query:
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema.model_dump() if tool.inputSchema else {}
"inputSchema": tool.inputSchema.model_dump()
if tool.inputSchema
else {},
}
for tool in result.root.tools
]
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {"tools": tools_data}
"result": {"tools": tools_data},
}
elif method == "tools/call":
request = CallToolRequest(
method=method,
params=CallToolRequestParams(
name=params.get("name"),
arguments=params.get("arguments", {})
)
name=params.get("name"), arguments=params.get("arguments", {})
),
)
handler = server.request_handlers.get(CallToolRequest)
if handler:
@ -386,19 +385,25 @@ class Query:
# Convert MCP result to JSONRPC response
content = []
for item in result.root.content:
if hasattr(item, 'text'):
if hasattr(item, "text"):
content.append({"type": "text", "text": item.text})
elif hasattr(item, 'data') and hasattr(item, 'mimeType'):
content.append({"type": "image", "data": item.data, "mimeType": item.mimeType})
elif hasattr(item, "data") and hasattr(item, "mimeType"):
content.append(
{
"type": "image",
"data": item.data,
"mimeType": item.mimeType,
}
)
response_data = {"content": content}
if hasattr(result.root, 'is_error') and result.root.is_error:
if hasattr(result.root, "is_error") and result.root.is_error:
response_data["is_error"] = True
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": response_data
"result": response_data,
}
# Add more methods here as MCP SDK adds them (resources, prompts, etc.)

View file

@ -110,8 +110,8 @@ class ClaudeSDKClient:
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher = {
"matcher": matcher.matcher if hasattr(matcher, 'matcher') else None,
"hooks": matcher.hooks if hasattr(matcher, 'hooks') else []
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
internal_hooks[event].append(internal_matcher)
return internal_hooks
@ -152,7 +152,9 @@ class ClaudeSDKClient:
transport=self._transport,
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
can_use_tool=self.options.can_use_tool,
hooks=self._convert_hooks_to_internal_format(self.options.hooks) if self.options.hooks else None,
hooks=self._convert_hooks_to_internal_format(self.options.hooks)
if self.options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
)

View file

@ -19,59 +19,73 @@ PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"]
# Permission Update types (matching TypeScript SDK)
PermissionUpdateDestination = Literal[
"userSettings",
"projectSettings",
"localSettings",
"session"
"userSettings", "projectSettings", "localSettings", "session"
]
PermissionBehavior = Literal["allow", "deny", "ask"]
@dataclass
class PermissionRuleValue:
"""Permission rule value."""
toolName: str
ruleContent: str | None = None
@dataclass
tool_name: str
rule_content: str | None = None
@dataclass
class PermissionUpdate:
"""Permission update configuration."""
type: Literal["addRules", "replaceRules", "removeRules", "setMode", "addDirectories", "removeDirectories"]
type: Literal[
"addRules",
"replaceRules",
"removeRules",
"setMode",
"addDirectories",
"removeDirectories",
]
rules: list[PermissionRuleValue] | None = None
behavior: PermissionBehavior | None = None
mode: PermissionMode | None = None
directories: list[str] | None = None
destination: PermissionUpdateDestination | None = None
# Tool callback types
@dataclass
class ToolPermissionContext:
"""Context information for tool permission callbacks."""
signal: Any | None = None # Future: abort signal support
suggestions: list[PermissionUpdate] = field(default_factory=list) # Permission suggestions from CLI
suggestions: list[PermissionUpdate] = field(
default_factory=list
) # Permission suggestions from CLI
# Match TypeScript's PermissionResult structure
@dataclass
class PermissionResultAllow:
"""Allow permission result."""
behavior: Literal["allow"] = "allow"
updatedInput: dict[str, Any] | None = None
updatedPermissions: list[PermissionUpdate] | None = None
updated_input: dict[str, Any] | None = None
updated_permissions: list[PermissionUpdate] | None = None
@dataclass
class PermissionResultDeny:
"""Deny permission result."""
behavior: Literal["deny"] = "deny"
behavior: Literal["deny"] = "deny"
message: str = ""
interrupt: bool = False
PermissionResult = PermissionResultAllow | PermissionResultDeny
CanUseTool = Callable[
[str, dict[str, Any], ToolPermissionContext],
Awaitable[PermissionResult]
[str, dict[str, Any], ToolPermissionContext], Awaitable[PermissionResult]
]
@ -85,7 +99,7 @@ class HookContext:
HookCallback = Callable[
[dict[str, Any], str | None, HookContext], # input, tool_use_id, context
Awaitable[dict[str, Any]] # response data
Awaitable[dict[str, Any]], # response data
]
@ -260,6 +274,7 @@ class SDKControlPermissionRequest(TypedDict):
permission_suggestions: list[Any] | None
blocked_path: str | None
class SDKControlInitializeRequest(TypedDict):
subtype: Literal["initialize"]
# TODO: Use HookEvent names as the key.

View file

@ -38,6 +38,7 @@ class MockTransport(Transport):
async def _read():
for msg in self.messages_to_read:
yield msg
return _read()
def is_ready(self) -> bool:
@ -53,9 +54,7 @@ class TestToolPermissionCallbacks:
callback_invoked = False
async def allow_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
nonlocal callback_invoked
callback_invoked = True
@ -68,7 +67,7 @@ class TestToolPermissionCallbacks:
transport=transport,
is_streaming_mode=True,
can_use_tool=allow_callback,
hooks=None
hooks=None,
)
# Simulate control request
@ -79,8 +78,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "TestTool",
"input": {"param": "value"},
"permission_suggestions": []
}
"permission_suggestions": [],
},
}
await query._handle_control_request(request)
@ -96,21 +95,18 @@ class TestToolPermissionCallbacks:
@pytest.mark.asyncio
async def test_permission_callback_deny(self):
"""Test callback that denies tool execution."""
async def deny_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultDeny:
return PermissionResultDeny(
message="Security policy violation"
)
return PermissionResultDeny(message="Security policy violation")
transport = MockTransport()
query = Query(
transport=transport,
is_streaming_mode=True,
can_use_tool=deny_callback,
hooks=None
hooks=None,
)
request = {
@ -120,8 +116,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "DangerousTool",
"input": {"command": "rm -rf /"},
"permission_suggestions": ["deny"]
}
"permission_suggestions": ["deny"],
},
}
await query._handle_control_request(request)
@ -135,24 +131,21 @@ class TestToolPermissionCallbacks:
@pytest.mark.asyncio
async def test_permission_callback_input_modification(self):
"""Test callback that modifies tool input."""
async def modify_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
# Modify the input to add safety flag
modified_input = input_data.copy()
modified_input["safe_mode"] = True
return PermissionResultAllow(
updatedInput=modified_input
)
return PermissionResultAllow(updated_input=modified_input)
transport = MockTransport()
query = Query(
transport=transport,
is_streaming_mode=True,
can_use_tool=modify_callback,
hooks=None
hooks=None,
)
request = {
@ -162,8 +155,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "WriteTool",
"input": {"file_path": "/etc/passwd"},
"permission_suggestions": []
}
"permission_suggestions": [],
},
}
await query._handle_control_request(request)
@ -177,10 +170,9 @@ class TestToolPermissionCallbacks:
@pytest.mark.asyncio
async def test_callback_exception_handling(self):
"""Test that callback exceptions are properly handled."""
async def error_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
raise ValueError("Callback error")
@ -189,7 +181,7 @@ class TestToolPermissionCallbacks:
transport=transport,
is_streaming_mode=True,
can_use_tool=error_callback,
hooks=None
hooks=None,
)
request = {
@ -199,8 +191,8 @@ class TestToolPermissionCallbacks:
"subtype": "can_use_tool",
"tool_name": "TestTool",
"input": {},
"permission_suggestions": []
}
"permission_suggestions": [],
},
}
await query._handle_control_request(request)
@ -221,33 +213,20 @@ class TestHookCallbacks:
hook_calls = []
async def test_hook(
input_data: dict,
tool_use_id: str | None,
context: HookContext
input_data: dict, tool_use_id: str | None, context: HookContext
) -> dict:
hook_calls.append({
"input": input_data,
"tool_use_id": tool_use_id
})
hook_calls.append({"input": input_data, "tool_use_id": tool_use_id})
return {"processed": True}
transport = MockTransport()
# Create hooks configuration
hooks = {
"tool_use_start": [
{
"matcher": {"tool": "TestTool"},
"hooks": [test_hook]
}
]
"tool_use_start": [{"matcher": {"tool": "TestTool"}, "hooks": [test_hook]}]
}
query = Query(
transport=transport,
is_streaming_mode=True,
can_use_tool=None,
hooks=hooks
transport=transport, is_streaming_mode=True, can_use_tool=None, hooks=hooks
)
# Manually register the hook callback to avoid needing the full initialize flow
@ -262,8 +241,8 @@ class TestHookCallbacks:
"subtype": "hook_callback",
"callback_id": callback_id,
"input": {"test": "data"},
"tool_use_id": "tool-123"
}
"tool_use_id": "tool-123",
},
}
await query._handle_control_request(request)
@ -284,17 +263,14 @@ class TestClaudeCodeOptionsIntegration:
def test_options_with_callbacks(self):
"""Test creating options with callbacks."""
async def my_callback(
tool_name: str,
input_data: dict,
context: ToolPermissionContext
tool_name: str, input_data: dict, context: ToolPermissionContext
) -> PermissionResultAllow:
return PermissionResultAllow()
async def my_hook(
input_data: dict,
tool_use_id: str | None,
context: HookContext
input_data: dict, tool_use_id: str | None, context: HookContext
) -> dict:
return {}
@ -302,12 +278,9 @@ class TestClaudeCodeOptionsIntegration:
can_use_tool=my_callback,
hooks={
"tool_use_start": [
HookMatcher(
matcher={"tool": "Bash"},
hooks=[my_hook]
)
HookMatcher(matcher={"tool": "Bash"}, hooks=[my_hook])
]
}
},
)
assert options.can_use_tool == my_callback