fix(backend/copilot): strip stale error markers from rolled-back partial + carry retryable through _HandledStreamError

Two fixes layered on the partial-restore path introduced by this PR:

1. _rollback_attempt_capturing_partial now drops trailing error markers
   (COPILOT_ERROR_PREFIX / COPILOT_RETRYABLE_ERROR_PREFIX) from the
   captured partial. _run_stream_attempt's idle-timeout and
   circuit-breaker paths append a marker via _append_error_marker BEFORE
   raising _HandledStreamError; without this filter the post-loop
   restore would replay the stale marker and then add a fresh one,
   leaving duplicate error bubbles and pushing any synthetic tool_result
   after an assistant(error) turn that has no matching tool_use.

2. Replace the (msg, code, already_yielded) 3-tuple carrying
   _HandledStreamError state out of the retry loop with a frozen
   _HandledErrorInfo dataclass that also carries `retryable`. The
   post-loop block now uses exc.retryable instead of hardcoding True,
   so a future _HandledStreamError(retryable=False, ...) won't silently
   write the wrong marker prefix.

3 new tests cover the rollback marker-stripping contract.
This commit is contained in:
majdyz
2026-04-25 08:53:48 +07:00
parent b1172e203a
commit 5406fe9b5b
2 changed files with 142 additions and 15 deletions

View File

@@ -25,6 +25,7 @@ from backend.copilot.response_model import StreamToolOutputAvailable
from .service import (
_flush_orphan_tool_uses_to_session,
_restore_partial_with_error_marker,
_rollback_attempt_capturing_partial,
)
@@ -163,6 +164,99 @@ class TestFlushOrphanToolUses:
adapter._flush_unresolved_tool_calls.assert_not_called()
class TestRollbackCapturingPartial:
"""Direct tests of `_rollback_attempt_capturing_partial`.
The retry loop relies on this helper not to leak error markers that
`_run_stream_attempt` already appended to `session.messages` — otherwise
the post-loop restore replays a stale marker before adding its own,
leaving duplicate error bubbles.
"""
def _builder_with_snap(self):
builder = MagicMock()
builder.restore = MagicMock()
return builder
def test_returns_partial_when_no_marker_present(self):
session = _make_session(
[
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="part-1"),
]
)
builder = self._builder_with_snap()
captured = _rollback_attempt_capturing_partial(
session, builder, transcript_snap=object(), pre_attempt_msg_count=1
)
assert [m.content for m in captured] == ["part-1"]
assert session.messages == [ChatMessage(role="user", content="hi")]
def test_strips_trailing_error_marker(self):
# _run_stream_attempt appended a marker via _append_error_marker
# (e.g. idle timeout, circuit breaker) before raising
# _HandledStreamError. The rollback must NOT carry it forward, or
# the post-loop restore will replay the stale marker + add its own.
marker = (
f"{COPILOT_RETRYABLE_ERROR_PREFIX} The session has been idle "
"for too long. Please try again."
)
session = _make_session(
[
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="part-1"),
ChatMessage(role="assistant", content=marker),
]
)
captured = _rollback_attempt_capturing_partial(
session,
self._builder_with_snap(),
transcript_snap=object(),
pre_attempt_msg_count=1,
)
assert [m.content for m in captured] == ["part-1"]
def test_strips_consecutive_error_markers(self):
# Defensive: if more than one marker landed back-to-back (legacy
# path or future regression), strip them all.
session = _make_session(
[
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="part-1"),
ChatMessage(role="assistant", content=f"{COPILOT_ERROR_PREFIX} a"),
ChatMessage(
role="assistant",
content=f"{COPILOT_RETRYABLE_ERROR_PREFIX} b",
),
]
)
captured = _rollback_attempt_capturing_partial(
session,
self._builder_with_snap(),
transcript_snap=object(),
pre_attempt_msg_count=1,
)
assert [m.content for m in captured] == ["part-1"]
def test_does_not_strip_non_marker_assistant(self):
# Regular assistant text starting with similar-but-not-prefix
# content must be preserved — only the canonical error markers
# should be filtered.
session = _make_session(
[
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="Important note"),
]
)
captured = _rollback_attempt_capturing_partial(
session,
self._builder_with_snap(),
transcript_snap=object(),
pre_attempt_msg_count=1,
)
assert [m.content for m in captured] == ["Important note"]
class TestRetryRollbackContract:
"""Property-style: a rolled-back attempt must be recoverable on final exit.

View File

@@ -624,13 +624,30 @@ def _rollback_attempt_capturing_partial(
during the failed attempt. The caller passes it to
``_restore_partial_with_error_marker`` on final-failure exit and discards
it on a successful retry.
Trailing error markers appended inside ``_run_stream_attempt`` (idle
timeout, circuit breaker) are stripped: re-attaching them would let the
post-loop restore replay a stale marker before adding its own, leaving
duplicate error bubbles and pushing any synthetic ``tool_result`` after
an assistant(error) turn that has no matching ``tool_use``.
"""
captured = list(session.messages[pre_attempt_msg_count:])
while captured and _is_error_marker(captured[-1]):
captured.pop()
session.messages = session.messages[:pre_attempt_msg_count]
transcript_builder.restore(transcript_snap) # type: ignore[arg-type]
return captured
def _is_error_marker(msg: ChatMessage) -> bool:
"""True if *msg* is an error marker emitted by ``_append_error_marker``."""
if msg.role != "assistant" or not msg.content:
return False
return msg.content.startswith(COPILOT_ERROR_PREFIX) or msg.content.startswith(
COPILOT_RETRYABLE_ERROR_PREFIX
)
def _setup_langfuse_otel() -> None:
"""Configure OTEL tracing for the Claude Agent SDK → Langfuse.
@@ -2079,6 +2096,21 @@ class _HandledStreamError(Exception):
self.already_yielded = already_yielded
@dataclass(frozen=True)
class _HandledErrorInfo:
"""Carries a `_HandledStreamError`'s decisions out of the retry loop.
Set inside the `except _HandledStreamError` branch and consumed by the
post-loop block, which restores the partial and (if the inner handler
didn't already do it) yields the client-facing StreamError.
"""
error_msg: str
code: str
retryable: bool
already_yielded: bool
@dataclass
class _EmptyToolBreakResult:
"""Result of checking for empty tool calls in a single AssistantMessage."""
@@ -3253,11 +3285,10 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
# failed attempt. On final-failure exit we re-attach these so the user sees
# what the assistant produced before the error rather than an empty chat.
last_attempt_partial: list[ChatMessage] = []
# Populated by the _HandledStreamError branch with (display_msg, code,
# already_yielded). Consumed once after the retry loop to re-attach the
# partial and, if the inner handler hadn't already emitted one, yield a
# single StreamError to the client.
handled_error_info: tuple[str, str, bool] | None = None
# Populated by the _HandledStreamError branch. Consumed once after the
# retry loop to re-attach the partial and, if the inner handler hadn't
# already emitted one, yield a single StreamError to the client.
handled_error_info: _HandledErrorInfo | None = None
# Defaults ensure the finally block can always reference these safely even when
# an early return (e.g. sdk_cwd error) skips their normal assignment below.
sdk_model: str | None = None
@@ -3976,10 +4007,11 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
# attempt that no longer match session.messages. Skip upload
# so a future --resume doesn't replay rolled-back content.
skip_transcript_upload = True
handled_error_info = (
exc.error_msg or FRIENDLY_TRANSIENT_MSG,
exc.code or "transient_api_error",
exc.already_yielded,
handled_error_info = _HandledErrorInfo(
error_msg=exc.error_msg or FRIENDLY_TRANSIENT_MSG,
code=exc.code or "transient_api_error",
retryable=exc.retryable,
already_yielded=exc.already_yielded,
)
ended_with_stream_error = True
break
@@ -4071,8 +4103,8 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
# for pyright to type-check.
if ended_with_stream_error:
if handled_error_info is not None:
final_msg, _, _ = handled_error_info
final_retryable = True
final_msg = handled_error_info.error_msg
final_retryable = handled_error_info.retryable
elif attempts_exhausted:
final_msg = (
"Your conversation is too long. "
@@ -4141,10 +4173,11 @@ async def stream_chat_completion_sdk( # pyright: ignore[reportGeneralTypeIssues
# here when the inner handler chose not to (transient errors suppress
# the early flash so the client only sees the final error after all
# retries are exhausted).
if handled_error_info is not None:
handled_msg, handled_code, already_yielded = handled_error_info
if not already_yielded:
yield StreamError(errorText=handled_msg, code=handled_code)
if handled_error_info is not None and not handled_error_info.already_yielded:
yield StreamError(
errorText=handled_error_info.error_msg,
code=handled_error_info.code,
)
# Copy token usage from retry state to outer-scope accumulators
# so the finally block can persist them.