mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(backend/copilot): strip stale error markers from rolled-back partial + carry retryable through _HandledStreamError
Two fixes layered on the partial-restore path introduced by this PR: 1. _rollback_attempt_capturing_partial now drops trailing error markers (COPILOT_ERROR_PREFIX / COPILOT_RETRYABLE_ERROR_PREFIX) from the captured partial. _run_stream_attempt's idle-timeout and circuit-breaker paths append a marker via _append_error_marker BEFORE raising _HandledStreamError; without this filter the post-loop restore would replay the stale marker and then add a fresh one, leaving duplicate error bubbles and pushing any synthetic tool_result after an assistant(error) turn that has no matching tool_use. 2. Replace the (msg, code, already_yielded) 3-tuple carrying _HandledStreamError state out of the retry loop with a frozen _HandledErrorInfo dataclass that also carries `retryable`. The post-loop block now uses exc.retryable instead of hardcoding True, so a future _HandledStreamError(retryable=False, ...) won't silently write the wrong marker prefix. 3 new tests cover the rollback marker-stripping contract.
This commit is contained in:
@@ -25,6 +25,7 @@ from backend.copilot.response_model import StreamToolOutputAvailable
|
||||
from .service import (
|
||||
_flush_orphan_tool_uses_to_session,
|
||||
_restore_partial_with_error_marker,
|
||||
_rollback_attempt_capturing_partial,
|
||||
)
|
||||
|
||||
|
||||
@@ -163,6 +164,99 @@ class TestFlushOrphanToolUses:
|
||||
adapter._flush_unresolved_tool_calls.assert_not_called()
|
||||
|
||||
|
||||
class TestRollbackCapturingPartial:
|
||||
"""Direct tests of `_rollback_attempt_capturing_partial`.
|
||||
|
||||
The retry loop relies on this helper not to leak error markers that
|
||||
`_run_stream_attempt` already appended to `session.messages` — otherwise
|
||||
the post-loop restore replays a stale marker before adding its own,
|
||||
leaving duplicate error bubbles.
|
||||
"""
|
||||
|
||||
def _builder_with_snap(self):
|
||||
builder = MagicMock()
|
||||
builder.restore = MagicMock()
|
||||
return builder
|
||||
|
||||
def test_returns_partial_when_no_marker_present(self):
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hi"),
|
||||
ChatMessage(role="assistant", content="part-1"),
|
||||
]
|
||||
)
|
||||
builder = self._builder_with_snap()
|
||||
captured = _rollback_attempt_capturing_partial(
|
||||
session, builder, transcript_snap=object(), pre_attempt_msg_count=1
|
||||
)
|
||||
assert [m.content for m in captured] == ["part-1"]
|
||||
assert session.messages == [ChatMessage(role="user", content="hi")]
|
||||
|
||||
def test_strips_trailing_error_marker(self):
|
||||
# _run_stream_attempt appended a marker via _append_error_marker
|
||||
# (e.g. idle timeout, circuit breaker) before raising
|
||||
# _HandledStreamError. The rollback must NOT carry it forward, or
|
||||
# the post-loop restore will replay the stale marker + add its own.
|
||||
marker = (
|
||||
f"{COPILOT_RETRYABLE_ERROR_PREFIX} The session has been idle "
|
||||
"for too long. Please try again."
|
||||
)
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hi"),
|
||||
ChatMessage(role="assistant", content="part-1"),
|
||||
ChatMessage(role="assistant", content=marker),
|
||||
]
|
||||
)
|
||||
captured = _rollback_attempt_capturing_partial(
|
||||
session,
|
||||
self._builder_with_snap(),
|
||||
transcript_snap=object(),
|
||||
pre_attempt_msg_count=1,
|
||||
)
|
||||
assert [m.content for m in captured] == ["part-1"]
|
||||
|
||||
def test_strips_consecutive_error_markers(self):
|
||||
# Defensive: if more than one marker landed back-to-back (legacy
|
||||
# path or future regression), strip them all.
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hi"),
|
||||
ChatMessage(role="assistant", content="part-1"),
|
||||
ChatMessage(role="assistant", content=f"{COPILOT_ERROR_PREFIX} a"),
|
||||
ChatMessage(
|
||||
role="assistant",
|
||||
content=f"{COPILOT_RETRYABLE_ERROR_PREFIX} b",
|
||||
),
|
||||
]
|
||||
)
|
||||
captured = _rollback_attempt_capturing_partial(
|
||||
session,
|
||||
self._builder_with_snap(),
|
||||
transcript_snap=object(),
|
||||
pre_attempt_msg_count=1,
|
||||
)
|
||||
assert [m.content for m in captured] == ["part-1"]
|
||||
|
||||
def test_does_not_strip_non_marker_assistant(self):
|
||||
# Regular assistant text starting with similar-but-not-prefix
|
||||
# content must be preserved — only the canonical error markers
|
||||
# should be filtered.
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hi"),
|
||||
ChatMessage(role="assistant", content="Important note"),
|
||||
]
|
||||
)
|
||||
captured = _rollback_attempt_capturing_partial(
|
||||
session,
|
||||
self._builder_with_snap(),
|
||||
transcript_snap=object(),
|
||||
pre_attempt_msg_count=1,
|
||||
)
|
||||
assert [m.content for m in captured] == ["Important note"]
|
||||
|
||||
|
||||
class TestRetryRollbackContract:
|
||||
"""Property-style: a rolled-back attempt must be recoverable on final exit.
|
||||
|
||||
|
||||
@@ -624,13 +624,30 @@ def _rollback_attempt_capturing_partial(
|
||||
during the failed attempt. The caller passes it to
|
||||
``_restore_partial_with_error_marker`` on final-failure exit and discards
|
||||
it on a successful retry.
|
||||
|
||||
Trailing error markers appended inside ``_run_stream_attempt`` (idle
|
||||
timeout, circuit breaker) are stripped: re-attaching them would let the
|
||||
post-loop restore replay a stale marker before adding its own, leaving
|
||||
duplicate error bubbles and pushing any synthetic ``tool_result`` after
|
||||
an assistant(error) turn that has no matching ``tool_use``.
|
||||
"""
|
||||
captured = list(session.messages[pre_attempt_msg_count:])
|
||||
while captured and _is_error_marker(captured[-1]):
|
||||
captured.pop()
|
||||
session.messages = session.messages[:pre_attempt_msg_count]
|
||||
transcript_builder.restore(transcript_snap) # type: ignore[arg-type]
|
||||
return captured
|
||||
|
||||
|
||||
def _is_error_marker(msg: ChatMessage) -> bool:
|
||||
"""True if *msg* is an error marker emitted by ``_append_error_marker``."""
|
||||
if msg.role != "assistant" or not msg.content:
|
||||
return False
|
||||
return msg.content.startswith(COPILOT_ERROR_PREFIX) or msg.content.startswith(
|
||||
COPILOT_RETRYABLE_ERROR_PREFIX
|
||||
)
|
||||
|
||||
|
||||
def _setup_langfuse_otel() -> None:
|
||||
"""Configure OTEL tracing for the Claude Agent SDK → Langfuse.
|
||||
|
||||
@@ -2079,6 +2096,21 @@ class _HandledStreamError(Exception):
|
||||
self.already_yielded = already_yielded
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _HandledErrorInfo:
|
||||
"""Carries a `_HandledStreamError`'s decisions out of the retry loop.
|
||||
|
||||
Set inside the `except _HandledStreamError` branch and consumed by the
|
||||
post-loop block, which restores the partial and (if the inner handler
|
||||
didn't already do it) yields the client-facing StreamError.
|
||||
"""
|
||||
|
||||
error_msg: str
|
||||
code: str
|
||||
retryable: bool
|
||||
already_yielded: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class _EmptyToolBreakResult:
|
||||
"""Result of checking for empty tool calls in a single AssistantMessage."""
|
||||
@@ -3253,11 +3285,10 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
|
||||
# failed attempt. On final-failure exit we re-attach these so the user sees
|
||||
# what the assistant produced before the error rather than an empty chat.
|
||||
last_attempt_partial: list[ChatMessage] = []
|
||||
# Populated by the _HandledStreamError branch with (display_msg, code,
|
||||
# already_yielded). Consumed once after the retry loop to re-attach the
|
||||
# partial and, if the inner handler hadn't already emitted one, yield a
|
||||
# single StreamError to the client.
|
||||
handled_error_info: tuple[str, str, bool] | None = None
|
||||
# Populated by the _HandledStreamError branch. Consumed once after the
|
||||
# retry loop to re-attach the partial and, if the inner handler hadn't
|
||||
# already emitted one, yield a single StreamError to the client.
|
||||
handled_error_info: _HandledErrorInfo | None = None
|
||||
# Defaults ensure the finally block can always reference these safely even when
|
||||
# an early return (e.g. sdk_cwd error) skips their normal assignment below.
|
||||
sdk_model: str | None = None
|
||||
@@ -3976,10 +4007,11 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
|
||||
# attempt that no longer match session.messages. Skip upload
|
||||
# so a future --resume doesn't replay rolled-back content.
|
||||
skip_transcript_upload = True
|
||||
handled_error_info = (
|
||||
exc.error_msg or FRIENDLY_TRANSIENT_MSG,
|
||||
exc.code or "transient_api_error",
|
||||
exc.already_yielded,
|
||||
handled_error_info = _HandledErrorInfo(
|
||||
error_msg=exc.error_msg or FRIENDLY_TRANSIENT_MSG,
|
||||
code=exc.code or "transient_api_error",
|
||||
retryable=exc.retryable,
|
||||
already_yielded=exc.already_yielded,
|
||||
)
|
||||
ended_with_stream_error = True
|
||||
break
|
||||
@@ -4071,8 +4103,8 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
|
||||
# for pyright to type-check.
|
||||
if ended_with_stream_error:
|
||||
if handled_error_info is not None:
|
||||
final_msg, _, _ = handled_error_info
|
||||
final_retryable = True
|
||||
final_msg = handled_error_info.error_msg
|
||||
final_retryable = handled_error_info.retryable
|
||||
elif attempts_exhausted:
|
||||
final_msg = (
|
||||
"Your conversation is too long. "
|
||||
@@ -4141,10 +4173,11 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
|
||||
# here when the inner handler chose not to (transient errors suppress
|
||||
# the early flash so the client only sees the final error after all
|
||||
# retries are exhausted).
|
||||
if handled_error_info is not None:
|
||||
handled_msg, handled_code, already_yielded = handled_error_info
|
||||
if not already_yielded:
|
||||
yield StreamError(errorText=handled_msg, code=handled_code)
|
||||
if handled_error_info is not None and not handled_error_info.already_yielded:
|
||||
yield StreamError(
|
||||
errorText=handled_error_info.error_msg,
|
||||
code=handled_error_info.code,
|
||||
)
|
||||
|
||||
# Copy token usage from retry state to outer-scope accumulators
|
||||
# so the finally block can persist them.
|
||||
|
||||
Reference in New Issue
Block a user