Fix JSON stream parsing in subprocess transport

This commit is contained in:
eminemkun 2025-06-19 00:15:05 +09:00
parent 7efa8b3987
commit 26010a78cd
No known key found for this signature in database

View file

@ -18,6 +18,113 @@ from ...types import ClaudeCodeOptions
from . import Transport
class JSONStreamParser:
"""Handles parsing of potentially incomplete JSON streams."""
def __init__(self):
self._buffer = ""
def add_data(self, data: str) -> list[dict[str, Any]]:
"""Add new data to buffer and return any complete JSON objects."""
self._buffer += data
return self._extract_complete_objects()
def _extract_complete_objects(self) -> list[dict[str, Any]]:
"""Extract all complete JSON objects from the buffer."""
objects = []
# Handle newline-separated JSON objects
lines = self._buffer.split('\n')
# Keep the last line in buffer
self._buffer = lines[-1] if lines else ""
# Process all complete lines
for line in lines[:-1]:
line = line.strip()
if not line:
continue
if parsed_obj := self._try_parse_single_line(line):
objects.append(parsed_obj)
# Try to parse remaining buffer for complete objects
while self._buffer:
parsed_obj, remaining = self._try_parse_partial_buffer()
if parsed_obj:
objects.append(parsed_obj)
self._buffer = remaining
else:
break
return objects
def _try_parse_single_line(self, line: str) -> dict[str, Any] | None:
"""Try to parse a single line as JSON."""
try:
return json.loads(line)
except json.JSONDecodeError:
# If single line fails, add to buffer for partial parsing
self._buffer = line + "\n" + self._buffer
return None
def _try_parse_partial_buffer(self) -> tuple[dict[str, Any] | None, str]:
"""Try to extract complete JSON object from partial buffer."""
buffer = self._buffer.strip()
if not buffer:
return None, ""
# Quick attempt at full parse
try:
return json.loads(buffer), ""
except json.JSONDecodeError:
pass
# Try to find complete JSON object by tracking braces
complete_object = self._find_complete_json_object(buffer)
if complete_object:
try:
parsed = json.loads(complete_object)
remaining = buffer[len(complete_object):].strip()
return parsed, remaining
except json.JSONDecodeError:
pass
return None, buffer
def _find_complete_json_object(self, text: str) -> str | None:
"""Find the first complete JSON object in text using brace counting."""
if not text.startswith('{'):
return None
brace_count = 0
in_string = False
escape_next = False
for i, char in enumerate(text):
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
return text[:i + 1]
return None
class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
@ -169,6 +276,7 @@ class SubprocessCLITransport(Transport):
raise CLIConnectionError("Not connected")
stderr_lines = []
json_parser = JSONStreamParser()
async def read_stderr() -> None:
"""Read stderr in background."""
@ -188,17 +296,19 @@ class SubprocessCLITransport(Transport):
if not line_str:
continue
# Parse potentially incomplete JSON stream
try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
complete_objects = json_parser.add_data(line_str)
for json_obj in complete_objects:
yield json_obj
except Exception as e:
# If parsing fails completely, try simple fallback
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
try:
data = json.loads(line_str)
yield data
except json.JSONDecodeError as json_err:
raise SDKJSONDecodeError(line_str, json_err) from json_err
except anyio.ClosedResourceError:
pass