fix(backend): add circuit breaker for infinite tool call retry loops (#12499)

## Summary
- Adds a two-layer circuit breaker to prevent AutoPilot from looping
infinitely when tool calls fail with empty parameters
- **Tool-level**: After 3 consecutive identical failures per tool,
returns a hard-stop message instructing the model to output content as
text instead of retrying
- **Stream-level**: After 6 consecutive empty tool calls (`input: {}`),
aborts the stream entirely with a user-visible error and retry button

## Background
In session `c5548b48`, the model completed all research successfully but
then spent 51+ minutes in an infinite loop trying to write output —
every tool call was sent with `input: {}` (likely due to context
saturation preventing argument serialization). 21+ identical failing
tool calls with no circuit breaker.

## Changes
- `tool_adapter.py`: Added `_check_circuit_breaker`,
`_record_tool_failure`, `_clear_tool_failures` functions with a
`ContextVar`-based tracker. Integrated into both `create_tool_handler`
(BaseTool) and the `_truncating` wrapper (all tools).
- `service.py`: Added empty-tool-call detection in the main stream loop
that counts consecutive `AssistantMessage`s with empty
`ToolUseBlock.input` and aborts after the limit.
- `test_circuit_breaker.py`: 7 unit tests covering threshold behavior,
per-args tracking, reset on success, and uninitialized tracker safety.

## Test plan
- [x] Unit tests pass (`pytest
backend/copilot/sdk/test_circuit_breaker.py` — 8/8 passing)
- [x] Pre-commit hooks pass (Ruff, Black, isort, typecheck all pass)
- [x] E2E: CoPilot tool calls work normally (GetCurrentTimeBlock
returned 09:16:39 UTC)
- [x] E2E: Circuit breaker pass-through verified (successful calls don't
trigger breaker)
- [x] E2E: Circuit breaker code integrated into tool_adapter truncating
wrapper
This commit is contained in:
Zamil Majdy
2026-03-24 12:45:12 +07:00
committed by GitHub
parent 32eac6d52e
commit 3d4fcfacb6
4 changed files with 414 additions and 12 deletions

View File

@@ -63,6 +63,50 @@ Example — committing an image file to GitHub:
}}
```
### Writing large files — CRITICAL
**Never write an entire large document in a single tool call.** When the
content you want to write exceeds ~2000 words the tool call's output token
limit will silently truncate the arguments, producing an empty `{{}}` input
that fails repeatedly.
**Preferred: compose from file references.** If the data is already in
files (tool outputs, workspace files), compose the report in one call
using `@@agptfile:` references — the system expands them inline:
```bash
cat > report.md << 'EOF'
# Research Report
## Data from web research
@@agptfile:/home/user/web_results.txt
## Block execution output
@@agptfile:workspace://<file_id>
## Conclusion
<brief synthesis>
EOF
```
**Fallback: write section-by-section.** When you must generate content
from conversation context (no files to reference), split into multiple
`bash_exec` calls — one section per call:
```bash
cat > report.md << 'EOF'
# Section 1
<content from your earlier tool call results>
EOF
```
```bash
cat >> report.md << 'EOF'
# Section 2
<content from your earlier tool call results>
EOF
```
Use `cat >` for the first chunk and `cat >>` to append subsequent chunks.
Do not re-fetch or re-generate data you already have from prior tool calls.
After building the file, reference it with `@@agptfile:` in other tools:
`@@agptfile:/home/user/report.md`
### Sub-agent tasks
- When using the Task tool, NEVER set `run_in_background` to true.
All tasks must run in the foreground.

View File

@@ -80,6 +80,7 @@ from .tool_adapter import (
create_copilot_mcp_server,
get_copilot_tool_names,
get_sdk_disallowed_tools,
reset_tool_failure_counters,
set_execution_context,
wait_for_stash,
)
@@ -105,6 +106,20 @@ config = ChatConfig()
# Non-context errors (network, auth, rate-limit) are NOT retried.
_MAX_STREAM_ATTEMPTS = 3
# Hard circuit breaker: abort the stream if the model sends this many
# consecutive tool calls with empty parameters (a sign of context
# saturation or serialization failure). Empty input ({}) is never
# legitimate — even one is suspicious, three is conclusive.
_EMPTY_TOOL_CALL_LIMIT = 3
# User-facing error shown when the empty-tool-call circuit breaker trips.
_CIRCUIT_BREAKER_ERROR_MSG = (
"AutoPilot was unable to complete the tool call "
"— this usually happens when the response is "
"too large to fit in a single tool call. "
"Try breaking your request into smaller parts."
)
# Patterns that indicate the prompt/request exceeds the model's context limit.
# Matched case-insensitively against the full exception chain.
_PROMPT_TOO_LONG_PATTERNS: tuple[str, ...] = (
@@ -996,15 +1011,122 @@ def _dispatch_response(
return response
class _TransientErrorHandled(Exception):
class _HandledStreamError(Exception):
"""Raised by `_run_stream_attempt` after it has already yielded a
`StreamError` for a transient API error.
`StreamError` to the client (e.g. transient API error, circuit breaker).
This signals the outer retry loop that the attempt failed so it can
perform session-message rollback and set the `ended_with_stream_error`
flag, **without** yielding a duplicate `StreamError` to the client.
Attributes:
error_msg: The user-facing error message to persist.
code: Machine-readable error code (e.g. ``circuit_breaker_empty_tool_calls``).
retryable: Whether the frontend should offer a retry button.
"""
def __init__(
self,
message: str,
error_msg: str | None = None,
code: str | None = None,
retryable: bool = True,
):
super().__init__(message)
self.error_msg = error_msg
self.code = code
self.retryable = retryable
@dataclass
class _EmptyToolBreakResult:
"""Result of checking for empty tool calls in a single AssistantMessage."""
count: int # Updated consecutive counter
tripped: bool # Whether the circuit breaker fired
error: StreamError | None # StreamError to yield (if tripped)
error_msg: str | None # Error message (if tripped)
error_code: str | None # Error code (if tripped)
def _check_empty_tool_breaker(
sdk_msg: object,
consecutive: int,
ctx: _StreamContext,
state: _RetryState,
) -> _EmptyToolBreakResult:
"""Detect consecutive empty tool calls and trip the circuit breaker.
Returns an ``_EmptyToolBreakResult`` with the updated counter and, if the
breaker tripped, the ``StreamError`` to yield plus the error metadata.
"""
if not isinstance(sdk_msg, AssistantMessage):
return _EmptyToolBreakResult(consecutive, False, None, None, None)
empty_tools = [
b.name for b in sdk_msg.content if isinstance(b, ToolUseBlock) and not b.input
]
if not empty_tools:
# Reset on any non-empty-tool AssistantMessage (including text-only
# messages — any() over empty content is False).
return _EmptyToolBreakResult(0, False, None, None, None)
consecutive += 1
# Log full diagnostics on first occurrence only; subsequent hits just
# log the counter to reduce noise.
if consecutive == 1:
logger.warning(
"%s Empty tool call detected (%d/%d): "
"tools=%s, model=%s, error=%s, "
"block_types=%s, cumulative_usage=%s",
ctx.log_prefix,
consecutive,
_EMPTY_TOOL_CALL_LIMIT,
empty_tools,
sdk_msg.model,
sdk_msg.error,
[type(b).__name__ for b in sdk_msg.content],
{
"prompt": state.usage.prompt_tokens,
"completion": state.usage.completion_tokens,
"cache_read": state.usage.cache_read_tokens,
},
)
else:
logger.warning(
"%s Empty tool call detected (%d/%d): tools=%s",
ctx.log_prefix,
consecutive,
_EMPTY_TOOL_CALL_LIMIT,
empty_tools,
)
if consecutive < _EMPTY_TOOL_CALL_LIMIT:
return _EmptyToolBreakResult(consecutive, False, None, None, None)
logger.error(
"%s Circuit breaker: aborting stream after %d "
"consecutive empty tool calls. "
"This is likely caused by the model attempting "
"to write content too large for a single tool "
"call's output token limit. The model should "
"write large files in chunks using bash_exec "
"with cat >> (append).",
ctx.log_prefix,
consecutive,
)
error_msg = _CIRCUIT_BREAKER_ERROR_MSG
error_code = "circuit_breaker_empty_tool_calls"
_append_error_marker(ctx.session, error_msg, retryable=True)
return _EmptyToolBreakResult(
count=consecutive,
tripped=True,
error=StreamError(errorText=error_msg, code=error_code),
error_msg=error_msg,
error_code=error_code,
)
async def _run_stream_attempt(
ctx: _StreamContext,
@@ -1039,6 +1161,12 @@ async def _run_stream_attempt(
accumulated_tool_calls=[],
)
ended_with_stream_error = False
# Stores the error message used by _append_error_marker so the outer
# retry loop can re-append the correct message after session rollback.
stream_error_msg: str | None = None
stream_error_code: str | None = None
consecutive_empty_tool_calls = 0
async with ClaudeSDKClient(options=state.options) as client:
logger.info(
@@ -1129,14 +1257,16 @@ async def _run_stream_attempt(
"suppressing raw error text",
ctx.log_prefix,
)
stream_error_msg = FRIENDLY_TRANSIENT_MSG
stream_error_code = "transient_api_error"
_append_error_marker(
ctx.session,
FRIENDLY_TRANSIENT_MSG,
stream_error_msg,
retryable=True,
)
yield StreamError(
errorText=FRIENDLY_TRANSIENT_MSG,
code="transient_api_error",
errorText=stream_error_msg,
code=stream_error_code,
)
ended_with_stream_error = True
break
@@ -1177,13 +1307,17 @@ async def _run_stream_attempt(
if isinstance(sdk_msg, ResultMessage):
logger.info(
"%s Received: ResultMessage %s "
"(unresolved=%d, current=%d, resolved=%d)",
"(unresolved=%d, current=%d, resolved=%d, "
"num_turns=%d, cost_usd=%s, result=%s)",
ctx.log_prefix,
sdk_msg.subtype,
len(state.adapter.current_tool_calls)
- len(state.adapter.resolved_tool_calls),
len(state.adapter.current_tool_calls),
len(state.adapter.resolved_tool_calls),
sdk_msg.num_turns,
sdk_msg.total_cost_usd,
(sdk_msg.result or "")[:200],
)
if sdk_msg.subtype in (
"error",
@@ -1240,6 +1374,18 @@ async def _run_stream_attempt(
)
entries_replaced = True
# --- Hard circuit breaker for empty tool calls ---
breaker = _check_empty_tool_breaker(
sdk_msg, consecutive_empty_tool_calls, ctx, state
)
consecutive_empty_tool_calls = breaker.count
if breaker.tripped and breaker.error is not None:
stream_error_msg = breaker.error_msg
stream_error_code = breaker.error_code
yield breaker.error
ended_with_stream_error = True
break
# --- Dispatch adapter responses ---
for response in state.adapter.convert_message(sdk_msg):
dispatched = _dispatch_response(
@@ -1320,8 +1466,10 @@ async def _run_stream_attempt(
# to the client (StreamError yielded above), raise so the outer retry
# loop can rollback session messages and set its error flags properly.
if ended_with_stream_error:
raise _TransientErrorHandled(
"Transient API error handled — StreamError already yielded"
raise _HandledStreamError(
"Stream error handled — StreamError already yielded",
error_msg=stream_error_msg,
code=stream_error_code,
)
@@ -1714,6 +1862,10 @@ async def stream_chat_completion_sdk(
)
for attempt in range(_MAX_STREAM_ATTEMPTS):
# Reset tool-level circuit breaker so failures from a previous
# (rolled-back) attempt don't carry over to the fresh attempt.
reset_tool_failure_counters()
if attempt > 0:
logger.info(
"%s Retrying with reduced context (%d/%d)",
@@ -1778,24 +1930,35 @@ async def stream_chat_completion_sdk(
_MAX_STREAM_ATTEMPTS,
)
raise
except _TransientErrorHandled:
except _HandledStreamError as exc:
# _run_stream_attempt already yielded a StreamError and
# appended an error marker. We only need to rollback
# session messages and set the error flag — do NOT set
# stream_err so the post-loop code won't emit a
# duplicate StreamError.
logger.warning(
"%s Transient error handled in stream attempt "
"(attempt %d/%d, events_yielded=%d)",
"%s Stream error handled in attempt "
"(attempt %d/%d, code=%s, events_yielded=%d)",
log_prefix,
attempt + 1,
_MAX_STREAM_ATTEMPTS,
exc.code or "transient",
events_yielded,
)
session.messages = session.messages[:pre_attempt_msg_count]
# transcript_builder still contains entries from the aborted
# attempt that no longer match session.messages. Skip upload
# so a future --resume doesn't replay rolled-back content.
skip_transcript_upload = True
# Re-append the error marker so it survives the rollback
# and is persisted by the finally block (see #2947655365).
_append_error_marker(session, FRIENDLY_TRANSIENT_MSG, retryable=True)
# Use the specific error message from the attempt (e.g.
# circuit breaker msg) rather than always the generic one.
_append_error_marker(
session,
exc.error_msg or FRIENDLY_TRANSIENT_MSG,
retryable=True,
)
ended_with_stream_error = True
break
except Exception as e:
@@ -1822,11 +1985,13 @@ async def stream_chat_completion_sdk(
log_prefix,
events_yielded,
)
skip_transcript_upload = True
ended_with_stream_error = True
break
if not is_context_error:
# Non-context errors (network, auth, rate-limit) should
# not trigger compaction — surface the error immediately.
skip_transcript_upload = True
ended_with_stream_error = True
break
continue

View File

@@ -0,0 +1,96 @@
"""Tests for the tool call circuit breaker in tool_adapter.py."""
import pytest
from backend.copilot.sdk.tool_adapter import (
_MAX_CONSECUTIVE_TOOL_FAILURES,
_check_circuit_breaker,
_clear_tool_failures,
_consecutive_tool_failures,
_record_tool_failure,
)
@pytest.fixture(autouse=True)
def _reset_tracker():
"""Reset the circuit breaker tracker for each test."""
token = _consecutive_tool_failures.set({})
yield
_consecutive_tool_failures.reset(token)
class TestCircuitBreaker:
def test_no_trip_below_threshold(self):
"""Circuit breaker should not trip before reaching the limit."""
args = {"file_path": "/tmp/test.txt"}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES - 1):
assert _check_circuit_breaker("write_file", args) is None
_record_tool_failure("write_file", args)
# Still under the limit
assert _check_circuit_breaker("write_file", args) is None
def test_trips_at_threshold(self):
"""Circuit breaker should trip after reaching the failure limit."""
args = {"file_path": "/tmp/test.txt"}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES):
assert _check_circuit_breaker("write_file", args) is None
_record_tool_failure("write_file", args)
# Now it should trip
result = _check_circuit_breaker("write_file", args)
assert result is not None
assert "STOP" in result
assert "write_file" in result
def test_different_args_tracked_separately(self):
"""Different args should have separate failure counters."""
args_a = {"file_path": "/tmp/a.txt"}
args_b = {"file_path": "/tmp/b.txt"}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES):
_record_tool_failure("write_file", args_a)
# args_a should trip
assert _check_circuit_breaker("write_file", args_a) is not None
# args_b should NOT trip
assert _check_circuit_breaker("write_file", args_b) is None
def test_different_tools_tracked_separately(self):
"""Different tools should have separate failure counters."""
args = {"file_path": "/tmp/test.txt"}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES):
_record_tool_failure("tool_a", args)
# tool_a should trip
assert _check_circuit_breaker("tool_a", args) is not None
# tool_b with same args should NOT trip
assert _check_circuit_breaker("tool_b", args) is None
def test_empty_args_tracked(self):
"""Empty args ({}) — the exact failure pattern from the bug — should be tracked."""
args = {}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES):
_record_tool_failure("write_file", args)
assert _check_circuit_breaker("write_file", args) is not None
def test_clear_resets_counter(self):
"""Clearing failures should reset the counter."""
args = {}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES):
_record_tool_failure("write_file", args)
_clear_tool_failures("write_file")
assert _check_circuit_breaker("write_file", args) is None
def test_success_clears_failures(self):
"""A successful call should reset the failure counter."""
args = {}
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES - 1):
_record_tool_failure("write_file", args)
# Success clears failures
_clear_tool_failures("write_file")
# Should be able to fail again without tripping
for _ in range(_MAX_CONSECUTIVE_TOOL_FAILURES - 1):
_record_tool_failure("write_file", args)
assert _check_circuit_breaker("write_file", args) is None
def test_no_tracker_returns_none(self):
"""If tracker is not initialized, circuit breaker should not trip."""
_consecutive_tool_failures.set(None) # type: ignore[arg-type]
_record_tool_failure("write_file", {}) # should not raise
assert _check_circuit_breaker("write_file", {}) is None

View File

@@ -66,6 +66,16 @@ _stash_event: ContextVar[asyncio.Event | None] = ContextVar(
"_stash_event", default=None
)
# Circuit breaker: tracks consecutive tool failures to detect infinite retry loops.
# When a tool is called repeatedly with empty/identical args and keeps failing,
# this counter is incremented. After _MAX_CONSECUTIVE_TOOL_FAILURES identical
# failures the tool handler returns a hard-stop message instead of the raw error.
_MAX_CONSECUTIVE_TOOL_FAILURES = 3
_consecutive_tool_failures: ContextVar[dict[str, int]] = ContextVar(
"_consecutive_tool_failures",
default=None, # type: ignore[arg-type]
)
def set_execution_context(
user_id: str | None,
@@ -91,6 +101,17 @@ def set_execution_context(
_current_project_dir.set(_encode_cwd_for_cli(sdk_cwd) if sdk_cwd else "")
_pending_tool_outputs.set({})
_stash_event.set(asyncio.Event())
_consecutive_tool_failures.set({})
def reset_tool_failure_counters() -> None:
"""Reset all tool-level circuit breaker counters.
Called at the start of each SDK retry attempt so that failure counts
from a previous (rolled-back) attempt do not carry over and prematurely
trip the breaker on a fresh attempt with different context.
"""
_consecutive_tool_failures.set({})
def pop_pending_tool_output(tool_name: str) -> str | None:
@@ -217,6 +238,66 @@ def _mcp_error(message: str) -> dict[str, Any]:
}
def _failure_key(tool_name: str, args: dict[str, Any]) -> str:
"""Compute a stable fingerprint for (tool_name, args) used by the circuit breaker."""
args_key = json.dumps(args, sort_keys=True, default=str)
return f"{tool_name}:{args_key}"
def _check_circuit_breaker(tool_name: str, args: dict[str, Any]) -> str | None:
"""Check if a tool has hit the consecutive failure limit.
Tracks failures keyed by (tool_name, args_fingerprint). Returns an error
message if the circuit breaker has tripped, or None if the call should proceed.
"""
tracker = _consecutive_tool_failures.get(None)
if tracker is None:
return None
key = _failure_key(tool_name, args)
count = tracker.get(key, 0)
if count >= _MAX_CONSECUTIVE_TOOL_FAILURES:
logger.warning(
"Circuit breaker tripped for tool %s after %d consecutive "
"identical failures (args=%s)",
tool_name,
count,
key[len(tool_name) + 1 :][:200],
)
return (
f"STOP: Tool '{tool_name}' has failed {count} consecutive times with "
f"the same arguments. Do NOT retry this tool call. "
f"If you were trying to write content to a file, instead respond with "
f"the content directly as a text message to the user."
)
return None
def _record_tool_failure(tool_name: str, args: dict[str, Any]) -> None:
"""Record a tool failure for circuit breaker tracking."""
tracker = _consecutive_tool_failures.get(None)
if tracker is None:
return
key = _failure_key(tool_name, args)
tracker[key] = tracker.get(key, 0) + 1
def _clear_tool_failures(tool_name: str) -> None:
"""Clear failure tracking for a tool on success.
Clears ALL args variants for the tool, not just the successful call's args.
This gives the tool a "fresh start" on any success, which is appropriate for
the primary use case (detecting infinite loops with identical failing args).
"""
tracker = _consecutive_tool_failures.get(None)
if tracker is None:
return
# Clear all entries for this tool name
keys_to_remove = [k for k in tracker if k.startswith(f"{tool_name}:")]
for k in keys_to_remove:
del tracker[k]
def create_tool_handler(base_tool: BaseTool):
"""Create an async handler function for a BaseTool.
@@ -358,6 +439,15 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
Applied once to every registered tool."""
async def wrapper(args: dict[str, Any]) -> dict[str, Any]:
# Circuit breaker: stop infinite retry loops with identical args.
# Use the original (pre-expansion) args for fingerprinting so
# check and record always use the same key — @@agptfile:
# expansion mutates args, which would cause a key mismatch.
original_args = args
stop_msg = _check_circuit_breaker(tool_name, original_args)
if stop_msg:
return _mcp_error(stop_msg)
user_id, session = get_execution_context()
if session is not None:
try:
@@ -365,6 +455,7 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
args, user_id, session, input_schema=input_schema
)
except FileRefExpansionError as exc:
_record_tool_failure(tool_name, original_args)
return _mcp_error(
f"@@agptfile: reference could not be resolved: {exc}. "
"Ensure the file exists before referencing it. "
@@ -374,6 +465,12 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
result = await fn(args)
truncated = truncate(result, _MCP_MAX_CHARS)
# Track consecutive failures for circuit breaker
if truncated.get("isError"):
_record_tool_failure(tool_name, original_args)
else:
_clear_tool_failures(tool_name)
# Stash the text so the response adapter can forward our
# middle-out truncated version to the frontend instead of the
# SDK's head-truncated version (for outputs >~100 KB the SDK