diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index f63732a..9187517 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -18,6 +18,113 @@ from ...types import ClaudeCodeOptions from . import Transport +class JSONStreamParser: + """Handles parsing of potentially incomplete JSON streams.""" + + def __init__(self): + self._buffer = "" + + def add_data(self, data: str) -> list[dict[str, Any]]: + """Add new data to buffer and return any complete JSON objects.""" + self._buffer += data + return self._extract_complete_objects() + + def _extract_complete_objects(self) -> list[dict[str, Any]]: + """Extract all complete JSON objects from the buffer.""" + objects = [] + + # Handle newline-separated JSON objects + lines = self._buffer.split('\n') + + # Keep the last line in buffer + self._buffer = lines[-1] if lines else "" + + # Process all complete lines + for line in lines[:-1]: + line = line.strip() + if not line: + continue + + if parsed_obj := self._try_parse_single_line(line): + objects.append(parsed_obj) + + # Try to parse remaining buffer for complete objects + while self._buffer: + parsed_obj, remaining = self._try_parse_partial_buffer() + if parsed_obj: + objects.append(parsed_obj) + self._buffer = remaining + else: + break + + return objects + + def _try_parse_single_line(self, line: str) -> dict[str, Any] | None: + """Try to parse a single line as JSON.""" + try: + return json.loads(line) + except json.JSONDecodeError: + # If single line fails, add to buffer for partial parsing + self._buffer = line + "\n" + self._buffer + return None + + def _try_parse_partial_buffer(self) -> tuple[dict[str, Any] | None, str]: + """Try to extract complete JSON object from partial buffer.""" + buffer = self._buffer.strip() + if not buffer: + return None, "" + + # Quick attempt at full parse + try: + return json.loads(buffer), "" + except json.JSONDecodeError: + pass + + # Try to find complete JSON object by tracking braces + complete_object = self._find_complete_json_object(buffer) + if complete_object: + try: + parsed = json.loads(complete_object) + remaining = buffer[len(complete_object):].strip() + return parsed, remaining + except json.JSONDecodeError: + pass + + return None, buffer + + def _find_complete_json_object(self, text: str) -> str | None: + """Find the first complete JSON object in text using brace counting.""" + if not text.startswith('{'): + return None + + brace_count = 0 + in_string = False + escape_next = False + + for i, char in enumerate(text): + if escape_next: + escape_next = False + continue + + if char == '\\': + escape_next = True + continue + + if char == '"' and not escape_next: + in_string = not in_string + continue + + if not in_string: + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0: + return text[:i + 1] + + return None + + class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI.""" @@ -169,6 +276,7 @@ class SubprocessCLITransport(Transport): raise CLIConnectionError("Not connected") stderr_lines = [] + json_parser = JSONStreamParser() async def read_stderr() -> None: """Read stderr in background.""" @@ -188,17 +296,19 @@ class SubprocessCLITransport(Transport): if not line_str: continue + # Parse potentially incomplete JSON stream try: - data = json.loads(line_str) - try: - yield data - except GeneratorExit: - # Handle generator cleanup gracefully - return - except json.JSONDecodeError as e: + complete_objects = json_parser.add_data(line_str) + for json_obj in complete_objects: + yield json_obj + except Exception as e: + # If parsing fails completely, try simple fallback if line_str.startswith("{") or line_str.startswith("["): - raise SDKJSONDecodeError(line_str, e) from e - continue + try: + data = json.loads(line_str) + yield data + except json.JSONDecodeError as json_err: + raise SDKJSONDecodeError(line_str, json_err) from json_err except anyio.ClosedResourceError: pass