feat: improve JSON parsing and add debugging capabilities

- Add buffer for incomplete JSON messages to handle truncation issues
- Implement better brace/bracket counting for multi-line JSON
- Add timeout, debug, and cli_path options to ClaudeCodeOptions
- Enhance error logging and debugging output
- Fix common JSONDecodeError issues reported in original repository
- Improve subprocess timeout handling

Addresses common issues:
- JSONDecodeError: Subprocess buffer truncates large messages
- Better error reporting and debugging capabilities
- Configurable CLI path and timeout settings
This commit is contained in:
Jon Raymond 2025-06-23 11:30:56 -06:00
parent 56018ce249
commit 380f4efdc4
No known key found for this signature in database
GPG key ID: 0C53BF8607304723
3 changed files with 67 additions and 3 deletions

View file

@ -29,7 +29,12 @@ class InternalClient:
) -> AsyncIterator[Message]:
"""Process a query through transport."""
transport = SubprocessCLITransport(prompt=prompt, options=options)
# Pass cli_path from options if provided
transport = SubprocessCLITransport(
prompt=prompt,
options=options,
cli_path=options.cli_path
)
try:
await transport.connect()

View file

@ -3,6 +3,7 @@
import json
import os
import shutil
import logging
from collections.abc import AsyncIterator
from pathlib import Path
from subprocess import PIPE
@ -17,6 +18,8 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions
from . import Transport
logger = logging.getLogger(__name__)
class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
@ -34,6 +37,12 @@ class SubprocessCLITransport(Transport):
self._process: Process | None = None
self._stdout_stream: TextReceiveStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._timeout = options.timeout
self._debug = options.debug
if self._debug:
logger.setLevel(logging.DEBUG)
logger.debug(f"Initialized transport with CLI path: {self._cli_path}")
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
@ -137,6 +146,11 @@ class SubprocessCLITransport(Transport):
return
cmd = self._build_command()
if self._debug:
logger.debug(f"Running command: {' '.join(cmd)}")
logger.debug(f"Working directory: {self._cwd}")
try:
self._process = await anyio.open_process(
cmd,
@ -154,6 +168,8 @@ class SubprocessCLITransport(Transport):
# Send the prompt via stdin
if self._process.stdin:
if self._debug:
logger.debug(f"Sending prompt via stdin: {self._prompt[:100]}...")
await self._process.stdin.send(self._prompt.encode() + b"\n")
await self._process.stdin.aclose() # Close stdin to signal we're done
@ -170,9 +186,12 @@ class SubprocessCLITransport(Transport):
if self._process.returncode is None:
try:
self._process.terminate()
with anyio.fail_after(5.0):
timeout = self._timeout or 5.0
with anyio.fail_after(timeout):
await self._process.wait()
except TimeoutError:
if self._debug:
logger.debug("Process termination timed out, killing...")
self._process.kill()
await self._process.wait()
except ProcessLookupError:
@ -191,6 +210,7 @@ class SubprocessCLITransport(Transport):
raise CLIConnectionError("Not connected")
stderr_lines = []
json_buffer = "" # Buffer for incomplete JSON
async def read_stderr() -> None:
"""Read stderr in background."""
@ -210,21 +230,55 @@ class SubprocessCLITransport(Transport):
if not line_str:
continue
# Handle potential multi-line JSON by buffering
if json_buffer:
line_str = json_buffer + line_str
json_buffer = ""
try:
data = json.loads(line_str)
if self._debug:
logger.debug(f"Parsed JSON: {data.get('type', 'unknown')}")
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
# Check if this might be incomplete JSON
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
# Check if we have unclosed braces/brackets
open_braces = line_str.count("{") - line_str.count("}")
open_brackets = line_str.count("[") - line_str.count("]")
if open_braces > 0 or open_brackets > 0:
# Buffer the incomplete JSON for next iteration
if self._debug:
logger.debug(f"Buffering incomplete JSON: {line_str[:100]}...")
json_buffer = line_str
continue
else:
# It's complete but invalid JSON
if self._debug:
logger.error(f"Invalid JSON: {line_str[:200]}...")
raise SDKJSONDecodeError(line_str, e) from e
# Skip non-JSON lines
if self._debug and line_str:
logger.debug(f"Skipping non-JSON line: {line_str[:100]}")
continue
except anyio.ClosedResourceError:
pass
# If there's still data in the buffer, try to parse it one more time
if json_buffer:
try:
data = json.loads(json_buffer)
yield data
except json.JSONDecodeError as e:
if json_buffer.startswith("{") or json_buffer.startswith("["):
raise SDKJSONDecodeError(json_buffer, e) from e
await self._process.wait()
if self._process.returncode is not None and self._process.returncode != 0:
stderr_output = "\n".join(stderr_lines)

View file

@ -106,3 +106,8 @@ class ClaudeCodeOptions:
model: str | None = None
permission_prompt_tool_name: str | None = None
cwd: str | Path | None = None
# New options for better error handling and debugging
timeout: int | None = None # Timeout in seconds for the CLI process
debug: bool = False # Enable debug logging
cli_path: str | Path | None = None # Custom path to claude-code CLI