Fix json reading and anyio error

This commit is contained in:
Dickson Tsai 2025-09-05 11:29:17 +09:00
parent d71c967fa1
commit 341a0bc685
No known key found for this signature in database
2 changed files with 34 additions and 17 deletions

View file

@ -15,7 +15,6 @@ from mcp.types import (
)
from ..types import (
PermissionResult,
PermissionResultAllow,
PermissionResultDeny,
SDKControlPermissionRequest,
@ -444,15 +443,14 @@ class Query:
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive SDK messages (not control messages)."""
async with self._message_receive:
async for message in self._message_receive:
# Check for special messages
if message.get("type") == "end":
break
elif message.get("type") == "error":
raise Exception(message.get("error", "Unknown error"))
async for message in self._message_receive:
# Check for special messages
if message.get("type") == "end":
break
elif message.get("type") == "error":
raise Exception(message.get("error", "Unknown error"))
yield message
yield message
async def close(self) -> None:
"""Close the query and transport."""

View file

@ -301,20 +301,39 @@ class SubprocessCLITransport(Transport):
if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected")
json_buffer = ""
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
# Process stdout messages
try:
async for line in self._stdout_stream:
line = line.strip()
if not line:
line_str = line.strip()
if not line_str:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
# Accumulate partial JSON until we can parse it
# Note: TextReceiveStream can truncate long lines, so we need to buffer
# and speculatively parse until we get a complete JSON object
json_buffer += line_str
if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = ""
raise SDKJSONDecodeError(
f"Invalid JSON from CLI: {line[:100]}",
e,
) from e
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
),
)
try:
data = json.loads(json_buffer)
json_buffer = ""
yield data
except json.JSONDecodeError:
# We are speculatively decoding the buffer until we get
# a full JSON object. If there is an actual issue, we
# raise an error after _MAX_BUFFER_SIZE.
continue
except anyio.ClosedResourceError:
pass