diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index f63732a..51e9401 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -164,11 +164,70 @@ class SubprocessCLITransport(Transport): """Not used for CLI transport - args passed via command line.""" async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: - """Receive messages from CLI.""" + """Receive messages from CLI with improved JSON parsing for chunked responses.""" if not self._process or not self._stdout_stream: raise CLIConnectionError("Not connected") stderr_lines = [] + json_buffer = "" + + def parse_json_chunks(buffer: str) -> tuple[list[dict], str]: + """Parse complete JSON objects from buffer, return objects and remaining buffer.""" + objects = [] + remaining = buffer.strip() + + while remaining: + # Find potential JSON start + start_idx = -1 + for i, char in enumerate(remaining): + if char in '{[': + start_idx = i + break + + if start_idx == -1: + break + + # Track braces to find complete JSON + brace_count = 0 + in_string = False + escape_next = False + + for i, char in enumerate(remaining[start_idx:], start_idx): + if escape_next: + escape_next = False + continue + + if char == '\\' and in_string: + escape_next = True + continue + + if char == '"' and not escape_next: + in_string = not in_string + continue + + if not in_string: + if char in '{[': + brace_count += 1 + elif char in '}]': + brace_count -= 1 + + if brace_count == 0: + # Found complete JSON + json_str = remaining[start_idx:i+1] + try: + obj = json.loads(json_str) + objects.append(obj) + remaining = remaining[i+1:].strip() + break + except json.JSONDecodeError: + # Invalid JSON, skip and continue + remaining = remaining[i+1:] + break + else: + # Incomplete JSON, keep in buffer + break + + return objects, remaining async def read_stderr() -> None: """Read stderr in background.""" @@ -188,17 +247,17 @@ class SubprocessCLITransport(Transport): if not line_str: continue - try: - data = json.loads(line_str) + # Add line to buffer + json_buffer += line_str + + # Try to parse complete JSON objects + complete_objects, json_buffer = parse_json_chunks(json_buffer) + + for data in complete_objects: try: yield data except GeneratorExit: - # Handle generator cleanup gracefully return - except json.JSONDecodeError as e: - if line_str.startswith("{") or line_str.startswith("["): - raise SDKJSONDecodeError(line_str, e) from e - continue except anyio.ClosedResourceError: pass