Fix JSON parsing for chunked responses in subprocess transport

Fixes #6 - Handle JSON responses that are split across multiple lines
in subprocess stdout. Implements stateful JSON parser that reconstructs
complete objects from fragments.

- Maintains buffer to accumulate partial JSON across lines
- Tracks brace/bracket nesting to identify complete JSON objects
- Handles string escaping and quotes properly to avoid false boundaries
- Tested with large responses that previously caused TaskGroup errors
- Backward compatible with existing functionality

The original implementation attempted to parse each line as complete JSON
with json.loads(), but large responses get chunked across multiple lines,
causing JSONDecodeError on incomplete fragments. This fix accumulates
lines in a buffer and uses stateful parsing to extract complete JSON
objects as they become available.

Resolves 'unhandled errors in a TaskGroup' issues reported by multiple users
when processing large prompts or responses with tool usage.
This commit is contained in:
couloirextreme 2025-06-18 16:05:56 -07:00
parent 7efa8b3987
commit 36948e152c

View file

@ -164,11 +164,70 @@ class SubprocessCLITransport(Transport):
"""Not used for CLI transport - args passed via command line."""
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive messages from CLI."""
"""Receive messages from CLI with improved JSON parsing for chunked responses."""
if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected")
stderr_lines = []
json_buffer = ""
def parse_json_chunks(buffer: str) -> tuple[list[dict], str]:
"""Parse complete JSON objects from buffer, return objects and remaining buffer."""
objects = []
remaining = buffer.strip()
while remaining:
# Find potential JSON start
start_idx = -1
for i, char in enumerate(remaining):
if char in '{[':
start_idx = i
break
if start_idx == -1:
break
# Track braces to find complete JSON
brace_count = 0
in_string = False
escape_next = False
for i, char in enumerate(remaining[start_idx:], start_idx):
if escape_next:
escape_next = False
continue
if char == '\\' and in_string:
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char in '{[':
brace_count += 1
elif char in '}]':
brace_count -= 1
if brace_count == 0:
# Found complete JSON
json_str = remaining[start_idx:i+1]
try:
obj = json.loads(json_str)
objects.append(obj)
remaining = remaining[i+1:].strip()
break
except json.JSONDecodeError:
# Invalid JSON, skip and continue
remaining = remaining[i+1:]
break
else:
# Incomplete JSON, keep in buffer
break
return objects, remaining
async def read_stderr() -> None:
"""Read stderr in background."""
@ -188,17 +247,17 @@ class SubprocessCLITransport(Transport):
if not line_str:
continue
try:
data = json.loads(line_str)
# Add line to buffer
json_buffer += line_str
# Try to parse complete JSON objects
complete_objects, json_buffer = parse_json_chunks(json_buffer)
for data in complete_objects:
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
except anyio.ClosedResourceError:
pass