claude-code-sdk-python/src/claude_agent_sdk/_internal/message_parser.py
2025-09-28 14:52:53 -07:00

172 lines
6.3 KiB
Python

"""Message parser for Claude Code SDK responses."""
import logging
from typing import Any
from .._errors import MessageParseError
from ..types import (
AssistantMessage,
ContentBlock,
Message,
ResultMessage,
StreamEvent,
SystemMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
logger = logging.getLogger(__name__)
def parse_message(data: dict[str, Any]) -> Message:
"""
Parse message from CLI output into typed Message objects.
Args:
data: Raw message dictionary from CLI output
Returns:
Parsed Message object
Raises:
MessageParseError: If parsing fails or message type is unrecognized
"""
if not isinstance(data, dict):
raise MessageParseError(
f"Invalid message data type (expected dict, got {type(data).__name__})",
data,
)
message_type = data.get("type")
if not message_type:
raise MessageParseError("Message missing 'type' field", data)
match message_type:
case "user":
try:
parent_tool_use_id = data.get("parent_tool_use_id")
if isinstance(data["message"]["content"], list):
user_content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
match block["type"]:
case "text":
user_content_blocks.append(
TextBlock(text=block["text"])
)
case "tool_use":
user_content_blocks.append(
ToolUseBlock(
id=block["id"],
name=block["name"],
input=block["input"],
)
)
case "tool_result":
user_content_blocks.append(
ToolResultBlock(
tool_use_id=block["tool_use_id"],
content=block.get("content"),
is_error=block.get("is_error"),
)
)
return UserMessage(
content=user_content_blocks,
parent_tool_use_id=parent_tool_use_id,
)
return UserMessage(
content=data["message"]["content"],
parent_tool_use_id=parent_tool_use_id,
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in user message: {e}", data
) from e
case "assistant":
try:
content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
match block["type"]:
case "text":
content_blocks.append(TextBlock(text=block["text"]))
case "thinking":
content_blocks.append(
ThinkingBlock(
thinking=block["thinking"],
signature=block["signature"],
)
)
case "tool_use":
content_blocks.append(
ToolUseBlock(
id=block["id"],
name=block["name"],
input=block["input"],
)
)
case "tool_result":
content_blocks.append(
ToolResultBlock(
tool_use_id=block["tool_use_id"],
content=block.get("content"),
is_error=block.get("is_error"),
)
)
return AssistantMessage(
content=content_blocks,
model=data["message"]["model"],
parent_tool_use_id=data.get("parent_tool_use_id"),
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in assistant message: {e}", data
) from e
case "system":
try:
return SystemMessage(
subtype=data["subtype"],
data=data,
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in system message: {e}", data
) from e
case "result":
try:
return ResultMessage(
subtype=data["subtype"],
duration_ms=data["duration_ms"],
duration_api_ms=data["duration_api_ms"],
is_error=data["is_error"],
num_turns=data["num_turns"],
session_id=data["session_id"],
total_cost_usd=data.get("total_cost_usd"),
usage=data.get("usage"),
result=data.get("result"),
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in result message: {e}", data
) from e
case "stream_event":
try:
return StreamEvent(
uuid=data["uuid"],
session_id=data["session_id"],
event=data["event"],
parent_tool_use_id=data.get("parent_tool_use_id"),
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in stream_event message: {e}", data
) from e
case _:
raise MessageParseError(f"Unknown message type: {message_type}", data)