fix: Replace TextReceiveStream with raw byte stream reading

Fixes #32 - JSON parsing fails for large tool results

The anyio.TextReceiveStream has issues when reading large lines (>10KB)
containing UTF-8 characters, causing JSON parsing to fail at position 130.

This fix:
- Removes dependency on TextReceiveStream
- Reads stdout/stderr as raw byte streams
- Manually handles line buffering and UTF-8 decoding
- Properly processes remaining buffer content

The issue occurred because TextReceiveStream appears to have a bug when
handling large lines with specific content patterns, particularly those
containing UTF-8 characters like the arrow symbol (→) used in line numbers.

Tested with:
- Large file reads (>30KB)
- Files containing UTF-8 characters
- Multiple concurrent tool uses
This commit is contained in:
Giray Turan 2025-06-22 00:49:29 +02:00
parent 7efa8b3987
commit 97c651b4d4
2 changed files with 285 additions and 20 deletions

View file

@ -10,7 +10,6 @@ from typing import Any
import anyio
from anyio.abc import Process
from anyio.streams.text import TextReceiveStream
from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError
from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
@ -32,8 +31,8 @@ class SubprocessCLITransport(Transport):
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout_stream: TextReceiveStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._stdout_stream: Any | None = None # Raw stdout stream
self._stderr_stream: Any | None = None # Raw stderr stream
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
@ -131,9 +130,11 @@ class SubprocessCLITransport(Transport):
)
if self._process.stdout:
self._stdout_stream = TextReceiveStream(self._process.stdout)
# Use raw stream to avoid TextReceiveStream issues with large JSON
self._stdout_stream = self._process.stdout
if self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
# Use raw stream for stderr as well
self._stderr_stream = self._process.stderr
except FileNotFoundError as e:
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
@ -174,8 +175,13 @@ class SubprocessCLITransport(Transport):
"""Read stderr in background."""
if self._stderr_stream:
try:
async for line in self._stderr_stream:
stderr_lines.append(line.strip())
# Read as bytes and decode
buffer = b""
async for chunk in self._stderr_stream:
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
stderr_lines.append(line.decode('utf-8', errors='replace').strip())
except anyio.ClosedResourceError:
pass
@ -183,22 +189,41 @@ class SubprocessCLITransport(Transport):
tg.start_soon(read_stderr)
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
# Read stdout as bytes to properly handle large JSON lines
buffer = b""
async for chunk in self._stdout_stream:
buffer += chunk
# Process complete lines
while b'\n' in buffer:
line_bytes, buffer = buffer.split(b'\n', 1)
line_str = line_bytes.decode('utf-8', errors='replace').strip()
if not line_str:
continue
try:
data = json.loads(line_str)
try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
# Process any remaining data in buffer
if buffer:
line_str = buffer.decode('utf-8', errors='replace').strip()
if line_str:
try:
data = json.loads(line_str)
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
except json.JSONDecodeError:
# Incomplete JSON at end, ignore
pass
except anyio.ClosedResourceError:
pass

View file

@ -0,0 +1,240 @@
"""Subprocess transport implementation using Claude Code CLI with fix for large JSON messages."""
import json
import os
import shutil
from collections.abc import AsyncIterator
from pathlib import Path
from subprocess import PIPE
from typing import Any
import anyio
from anyio.abc import Process
from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError
from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions
from . import Transport
class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
def __init__(
self,
prompt: str,
options: ClaudeCodeOptions,
cli_path: str | Path | None = None,
):
self._prompt = prompt
self._options = options
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout: Any = None # Store raw stdout
self._stderr: Any = None # Store raw stderr
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
if cli := shutil.which("claude"):
return cli
locations = [
Path.home() / ".npm-global/bin/claude",
Path("/usr/local/bin/claude"),
Path.home() / ".local/bin/claude",
Path.home() / "node_modules/.bin/claude",
Path.home() / ".yarn/bin/claude",
]
for path in locations:
if path.exists() and path.is_file():
return str(path)
node_installed = shutil.which("node") is not None
if not node_installed:
error_msg = "Claude Code requires Node.js, which is not installed.\n\n"
error_msg += "Install Node.js from: https://nodejs.org/\n"
error_msg += "\nAfter installing Node.js, install Claude Code:\n"
error_msg += " npm install -g @anthropic-ai/claude-code"
raise CLINotFoundError(error_msg)
raise CLINotFoundError(
"Claude Code not found. Install with:\n"
" npm install -g @anthropic-ai/claude-code\n"
"\nIf already installed locally, try:\n"
' export PATH="$HOME/node_modules/.bin:$PATH"\n'
"\nOr specify the path when creating transport:\n"
" SubprocessCLITransport(..., cli_path='/path/to/claude')"
)
def _build_command(self) -> list[str]:
"""Build CLI command with arguments."""
cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"]
if self._options.system_prompt:
cmd.extend(["--system-prompt", self._options.system_prompt])
if self._options.append_system_prompt:
cmd.extend(["--append-system-prompt", self._options.append_system_prompt])
if self._options.allowed_tools:
cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)])
if self._options.max_turns:
cmd.extend(["--max-turns", str(self._options.max_turns)])
if self._options.disallowed_tools:
cmd.extend(["--disallowedTools", ",".join(self._options.disallowed_tools)])
if self._options.model:
cmd.extend(["--model", self._options.model])
if self._options.permission_prompt_tool_name:
cmd.extend(
["--permission-prompt-tool", self._options.permission_prompt_tool_name]
)
if self._options.permission_mode:
cmd.extend(["--permission-mode", self._options.permission_mode])
if self._options.continue_conversation:
cmd.append("--continue")
if self._options.resume:
cmd.extend(["--resume", self._options.resume])
if self._options.mcp_servers:
cmd.extend(
["--mcp-config", json.dumps({"mcpServers": self._options.mcp_servers})]
)
cmd.extend(["--print", self._prompt])
return cmd
async def connect(self) -> None:
"""Start subprocess."""
if self._process:
return
cmd = self._build_command()
try:
self._process = await anyio.open_process(
cmd,
stdin=None,
stdout=PIPE,
stderr=PIPE,
cwd=self._cwd,
env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "sdk-py"},
)
# Store raw streams instead of wrapping in TextReceiveStream
self._stdout = self._process.stdout
self._stderr = self._process.stderr
except FileNotFoundError as e:
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
except Exception as e:
raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e
async def disconnect(self) -> None:
"""Terminate subprocess."""
if not self._process:
return
if self._process.returncode is None:
try:
self._process.terminate()
with anyio.fail_after(5.0):
await self._process.wait()
except TimeoutError:
self._process.kill()
await self._process.wait()
except ProcessLookupError:
pass
self._process = None
self._stdout = None
self._stderr = None
async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None:
"""Not used for CLI transport - args passed via command line."""
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive messages from CLI with proper handling of large JSON lines."""
if not self._process or not self._stdout:
raise CLIConnectionError("Not connected")
stderr_lines = []
async def read_stderr() -> None:
"""Read stderr in background."""
if self._stderr:
try:
# Read stderr as bytes and decode
buffer = b""
async for chunk in self._stderr:
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
stderr_lines.append(line.decode('utf-8', errors='replace').strip())
except anyio.ClosedResourceError:
pass
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)
try:
# Read stdout as bytes to avoid TextReceiveStream issues
buffer = b""
async for chunk in self._stdout:
buffer += chunk
# Process complete lines
while b'\n' in buffer:
line_bytes, buffer = buffer.split(b'\n', 1)
line_str = line_bytes.decode('utf-8', errors='replace').strip()
if not line_str:
continue
try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
# Process any remaining data in buffer
if buffer:
line_str = buffer.decode('utf-8', errors='replace').strip()
if line_str:
try:
data = json.loads(line_str)
yield data
except json.JSONDecodeError:
# Incomplete JSON at end, ignore
pass
except anyio.ClosedResourceError:
pass
await self._process.wait()
if self._process.returncode is not None and self._process.returncode != 0:
stderr_output = "\n".join(stderr_lines)
if stderr_output and "error" in stderr_output.lower():
raise ProcessError(
"CLI process failed",
exit_code=self._process.returncode,
stderr=stderr_output,
)
def is_connected(self) -> bool:
"""Check if subprocess is running."""
return self._process is not None and self._process.returncode is None