diff --git a/autogpt_platform/backend/backend/copilot/sdk/interrupted_partial_test.py b/autogpt_platform/backend/backend/copilot/sdk/interrupted_partial_test.py index 19d9f67a3b..85b8c2c373 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/interrupted_partial_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/interrupted_partial_test.py @@ -25,6 +25,7 @@ from backend.copilot.response_model import StreamToolOutputAvailable from .service import ( _flush_orphan_tool_uses_to_session, _restore_partial_with_error_marker, + _rollback_attempt_capturing_partial, ) @@ -163,6 +164,99 @@ class TestFlushOrphanToolUses: adapter._flush_unresolved_tool_calls.assert_not_called() +class TestRollbackCapturingPartial: + """Direct tests of `_rollback_attempt_capturing_partial`. + + The retry loop relies on this helper not to leak error markers that + `_run_stream_attempt` already appended to `session.messages` — otherwise + the post-loop restore replays a stale marker before adding its own, + leaving duplicate error bubbles. + """ + + def _builder_with_snap(self): + builder = MagicMock() + builder.restore = MagicMock() + return builder + + def test_returns_partial_when_no_marker_present(self): + session = _make_session( + [ + ChatMessage(role="user", content="hi"), + ChatMessage(role="assistant", content="part-1"), + ] + ) + builder = self._builder_with_snap() + captured = _rollback_attempt_capturing_partial( + session, builder, transcript_snap=object(), pre_attempt_msg_count=1 + ) + assert [m.content for m in captured] == ["part-1"] + assert session.messages == [ChatMessage(role="user", content="hi")] + + def test_strips_trailing_error_marker(self): + # _run_stream_attempt appended a marker via _append_error_marker + # (e.g. idle timeout, circuit breaker) before raising + # _HandledStreamError. The rollback must NOT carry it forward, or + # the post-loop restore will replay the stale marker + add its own. + marker = ( + f"{COPILOT_RETRYABLE_ERROR_PREFIX} The session has been idle " + "for too long. Please try again." + ) + session = _make_session( + [ + ChatMessage(role="user", content="hi"), + ChatMessage(role="assistant", content="part-1"), + ChatMessage(role="assistant", content=marker), + ] + ) + captured = _rollback_attempt_capturing_partial( + session, + self._builder_with_snap(), + transcript_snap=object(), + pre_attempt_msg_count=1, + ) + assert [m.content for m in captured] == ["part-1"] + + def test_strips_consecutive_error_markers(self): + # Defensive: if more than one marker landed back-to-back (legacy + # path or future regression), strip them all. + session = _make_session( + [ + ChatMessage(role="user", content="hi"), + ChatMessage(role="assistant", content="part-1"), + ChatMessage(role="assistant", content=f"{COPILOT_ERROR_PREFIX} a"), + ChatMessage( + role="assistant", + content=f"{COPILOT_RETRYABLE_ERROR_PREFIX} b", + ), + ] + ) + captured = _rollback_attempt_capturing_partial( + session, + self._builder_with_snap(), + transcript_snap=object(), + pre_attempt_msg_count=1, + ) + assert [m.content for m in captured] == ["part-1"] + + def test_does_not_strip_non_marker_assistant(self): + # Regular assistant text starting with similar-but-not-prefix + # content must be preserved — only the canonical error markers + # should be filtered. + session = _make_session( + [ + ChatMessage(role="user", content="hi"), + ChatMessage(role="assistant", content="Important note"), + ] + ) + captured = _rollback_attempt_capturing_partial( + session, + self._builder_with_snap(), + transcript_snap=object(), + pre_attempt_msg_count=1, + ) + assert [m.content for m in captured] == ["Important note"] + + class TestRetryRollbackContract: """Property-style: a rolled-back attempt must be recoverable on final exit. diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index 47575c26d8..613433993a 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -624,13 +624,30 @@ def _rollback_attempt_capturing_partial( during the failed attempt. The caller passes it to ``_restore_partial_with_error_marker`` on final-failure exit and discards it on a successful retry. + + Trailing error markers appended inside ``_run_stream_attempt`` (idle + timeout, circuit breaker) are stripped: re-attaching them would let the + post-loop restore replay a stale marker before adding its own, leaving + duplicate error bubbles and pushing any synthetic ``tool_result`` after + an assistant(error) turn that has no matching ``tool_use``. """ captured = list(session.messages[pre_attempt_msg_count:]) + while captured and _is_error_marker(captured[-1]): + captured.pop() session.messages = session.messages[:pre_attempt_msg_count] transcript_builder.restore(transcript_snap) # type: ignore[arg-type] return captured +def _is_error_marker(msg: ChatMessage) -> bool: + """True if *msg* is an error marker emitted by ``_append_error_marker``.""" + if msg.role != "assistant" or not msg.content: + return False + return msg.content.startswith(COPILOT_ERROR_PREFIX) or msg.content.startswith( + COPILOT_RETRYABLE_ERROR_PREFIX + ) + + def _setup_langfuse_otel() -> None: """Configure OTEL tracing for the Claude Agent SDK → Langfuse. @@ -2079,6 +2096,21 @@ class _HandledStreamError(Exception): self.already_yielded = already_yielded +@dataclass(frozen=True) +class _HandledErrorInfo: + """Carries a `_HandledStreamError`'s decisions out of the retry loop. + + Set inside the `except _HandledStreamError` branch and consumed by the + post-loop block, which restores the partial and (if the inner handler + didn't already do it) yields the client-facing StreamError. + """ + + error_msg: str + code: str + retryable: bool + already_yielded: bool + + @dataclass class _EmptyToolBreakResult: """Result of checking for empty tool calls in a single AssistantMessage.""" @@ -3253,11 +3285,10 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues # failed attempt. On final-failure exit we re-attach these so the user sees # what the assistant produced before the error rather than an empty chat. last_attempt_partial: list[ChatMessage] = [] - # Populated by the _HandledStreamError branch with (display_msg, code, - # already_yielded). Consumed once after the retry loop to re-attach the - # partial and, if the inner handler hadn't already emitted one, yield a - # single StreamError to the client. - handled_error_info: tuple[str, str, bool] | None = None + # Populated by the _HandledStreamError branch. Consumed once after the + # retry loop to re-attach the partial and, if the inner handler hadn't + # already emitted one, yield a single StreamError to the client. + handled_error_info: _HandledErrorInfo | None = None # Defaults ensure the finally block can always reference these safely even when # an early return (e.g. sdk_cwd error) skips their normal assignment below. sdk_model: str | None = None @@ -3976,10 +4007,11 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues # attempt that no longer match session.messages. Skip upload # so a future --resume doesn't replay rolled-back content. skip_transcript_upload = True - handled_error_info = ( - exc.error_msg or FRIENDLY_TRANSIENT_MSG, - exc.code or "transient_api_error", - exc.already_yielded, + handled_error_info = _HandledErrorInfo( + error_msg=exc.error_msg or FRIENDLY_TRANSIENT_MSG, + code=exc.code or "transient_api_error", + retryable=exc.retryable, + already_yielded=exc.already_yielded, ) ended_with_stream_error = True break @@ -4071,8 +4103,8 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues # for pyright to type-check. if ended_with_stream_error: if handled_error_info is not None: - final_msg, _, _ = handled_error_info - final_retryable = True + final_msg = handled_error_info.error_msg + final_retryable = handled_error_info.retryable elif attempts_exhausted: final_msg = ( "Your conversation is too long. " @@ -4141,10 +4173,11 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues # here when the inner handler chose not to (transient errors suppress # the early flash so the client only sees the final error after all # retries are exhausted). - if handled_error_info is not None: - handled_msg, handled_code, already_yielded = handled_error_info - if not already_yielded: - yield StreamError(errorText=handled_msg, code=handled_code) + if handled_error_info is not None and not handled_error_info.already_yielded: + yield StreamError( + errorText=handled_error_info.error_msg, + code=handled_error_info.code, + ) # Copy token usage from retry state to outer-scope accumulators # so the finally block can persist them.