simplify(backend/copilot): remove error classification, always retry

Remove RetryStrategy enum, _classify_error(), and _TRANSCRIPT_ERROR_PATTERNS.
All streaming errors now follow the same retry path:
  1. Original query (with transcript)
  2. Compacted transcript (LLM summarization)
  3. No transcript (DB-message rebuild)

No need to pattern-match error messages — compaction is cheap to attempt
and harmless if it fails (falls through to DB rebuild anyway).
This commit is contained in:
Zamil Majdy
2026-03-14 19:39:33 +07:00
parent 629ecc9436
commit b599858dea
3 changed files with 44 additions and 275 deletions

View File

@@ -1,4 +1,4 @@
"""Tests for prompt-too-long retry logic and transcript compaction helpers."""
"""Tests for retry logic and transcript compaction helpers."""
from __future__ import annotations
@@ -9,7 +9,6 @@ import pytest
from backend.util import json
from .service import RetryStrategy, _classify_error
from .transcript import (
_flatten_assistant_content,
_flatten_tool_result_content,
@@ -19,91 +18,6 @@ from .transcript import (
validate_transcript,
)
# ---------------------------------------------------------------------------
# _classify_error
# ---------------------------------------------------------------------------
class TestClassifyError:
"""Tests for _classify_error — maps errors to RetryStrategy values."""
@pytest.mark.parametrize(
"error_msg",
[
"prompt is too long: 250000 tokens > 200000 maximum",
"Error: prompt is too long",
"context_length_exceeded",
"request too large",
"Connection timeout",
"Authentication failed",
"Rate limit exceeded",
"Internal server error",
"SDK process exited with code 1",
"",
],
)
def test_general_errors_return_compact_then_fallback(self, error_msg: str):
"""Most errors (including prompt-too-long) → COMPACT_THEN_FALLBACK."""
assert (
_classify_error(Exception(error_msg)) == RetryStrategy.COMPACT_THEN_FALLBACK
)
@pytest.mark.parametrize(
"error_msg",
[
"invalid json in transcript",
"json decode error at position 42",
"JSONDecodeError: Expecting value",
"failed to read resume file",
"session file not found",
"malformed jsonl entry",
],
)
def test_transcript_errors_return_fallback_only(self, error_msg: str):
"""Transcript/JSON parse errors → FALLBACK_ONLY."""
assert _classify_error(Exception(error_msg)) == RetryStrategy.FALLBACK_ONLY
def test_walks_cause_chain(self):
"""Walks __cause__ to find transcript errors in wrapped exceptions."""
inner = Exception("invalid json in transcript")
outer = RuntimeError("SDK process failed")
outer.__cause__ = inner
assert _classify_error(outer) == RetryStrategy.FALLBACK_ONLY
def test_walks_context_chain(self):
"""Walks __context__ for implicit exception chaining."""
inner = Exception("json decode error")
outer = RuntimeError("during handling")
outer.__context__ = inner
assert _classify_error(outer) == RetryStrategy.FALLBACK_ONLY
def test_no_infinite_loop_on_circular_chain(self):
"""Circular exception chains terminate without hanging."""
a = Exception("error a")
b = Exception("error b")
a.__cause__ = b
b.__cause__ = a
assert _classify_error(a) == RetryStrategy.COMPACT_THEN_FALLBACK
def test_deep_chain(self):
"""Deeply nested exception chain is walked."""
bottom = Exception("malformed jsonl")
current = bottom
for i in range(10):
wrapper = RuntimeError(f"layer {i}")
wrapper.__cause__ = current
current = wrapper
assert _classify_error(current) == RetryStrategy.FALLBACK_ONLY
def test_case_insensitive(self):
"""Pattern matching is case-insensitive."""
assert _classify_error(Exception("INVALID JSON")) == RetryStrategy.FALLBACK_ONLY
assert (
_classify_error(Exception("Resume File not found"))
== RetryStrategy.FALLBACK_ONLY
)
# ---------------------------------------------------------------------------
# _flatten_assistant_content
# ---------------------------------------------------------------------------

View File

@@ -4,6 +4,11 @@ These tests exercise the retry decision logic end-to-end by simulating
the state transitions that happen in ``stream_chat_completion_sdk`` when
the SDK raises streaming errors.
On any error the retry loop tries, in order:
1. Original query (with transcript)
2. Compacted transcript (LLM summarization)
3. No transcript (DB-message rebuild)
Scenario matrix:
1. Normal flow — no error, no retry
2. Error → compact succeeds → retry succeeds
@@ -11,9 +16,8 @@ Scenario matrix:
4. Error → no transcript → DB fallback succeeds
5. Error × 2 → attempt 3 DB fallback succeeds
6. All 3 attempts exhausted → StreamError(all_attempts_exhausted)
7. Transcript parse error → skip compact, DB fallback directly
8. Compaction returns identical content → treated as compact failure → DB fallback
9. transcript_caused_error → finally skips upload
7. Compaction returns identical content → treated as compact failure → DB fallback
8. transcript_caused_error → finally skips upload
"""
from __future__ import annotations
@@ -25,7 +29,7 @@ import pytest
from backend.util import json
from .service import RetryStrategy, _classify_error
from .service import _MAX_QUERY_ATTEMPTS
from .transcript import (
_flatten_assistant_content,
_flatten_tool_result_content,
@@ -100,20 +104,9 @@ def _mock_compress_result(
class TestScenarioNormalFlow:
"""When no error occurs, no retry logic fires."""
def test_general_errors_get_compact_then_fallback(self):
"""General SDK errors should get COMPACT_THEN_FALLBACK strategy."""
general_errors = [
"Connection refused",
"SDK process exited with code 1",
"Authentication failed",
"Rate limit exceeded",
"Internal server error",
"prompt is too long",
]
for msg in general_errors:
assert (
_classify_error(Exception(msg)) == RetryStrategy.COMPACT_THEN_FALLBACK
), msg
def test_max_query_attempts_is_three(self):
"""Verify the constant is 3 (compact + DB fallback + exhaustion)."""
assert _MAX_QUERY_ATTEMPTS == 3
# ---------------------------------------------------------------------------
@@ -326,40 +319,18 @@ class TestScenarioAllAttemptsExhausted:
def test_exhaustion_state_variables(self):
"""Verify the state after exhausting all retry attempts."""
_MAX_QUERY_ATTEMPTS = 3
_retry_strategy: str | None = None
_stream_error: Exception | None = None
transcript_caused_error = False
for _query_attempt in range(_MAX_QUERY_ATTEMPTS):
_retry_strategy = RetryStrategy.COMPACT_THEN_FALLBACK
_stream_error = Exception("some error")
# After loop: check exhaustion
assert _retry_strategy is not None
assert _stream_error is not None
transcript_caused_error = True
assert transcript_caused_error is True
# ---------------------------------------------------------------------------
# Scenario 7: Transcript parse error → skip compact, DB fallback directly
# ---------------------------------------------------------------------------
class TestScenarioTranscriptParseError:
"""Transcript/JSON parse errors skip compaction and go straight to
DB fallback (FALLBACK_ONLY strategy)."""
def test_transcript_errors_get_fallback_only(self):
"""Verify transcript parse errors return FALLBACK_ONLY."""
transcript_errors = [
Exception("invalid json in line 5"),
RuntimeError("json decode error"),
ValueError("malformed jsonl entry"),
Exception("failed to read resume file"),
]
for err in transcript_errors:
assert _classify_error(err) == RetryStrategy.FALLBACK_ONLY, str(err)
# ---------------------------------------------------------------------------
# Scenario 8: Compaction returns identical content
# ---------------------------------------------------------------------------
@@ -476,7 +447,6 @@ class TestRetryStateMachine:
attempt_results: list[str],
transcript_content: str = "some_content",
compact_result: str | None = "compacted_content",
error_strategy: str = RetryStrategy.COMPACT_THEN_FALLBACK,
) -> dict:
"""Simulate the retry loop and return final state.
@@ -486,10 +456,8 @@ class TestRetryStateMachine:
"error" = streaming error
transcript_content: Initial transcript content ("" = none)
compact_result: Result of compact_transcript (None = failure)
error_strategy: RetryStrategy for errors
"""
_MAX_QUERY_ATTEMPTS = 3
_retry_strategy: str | None = None
_stream_error: Exception | None = None
transcript_caused_error = False
use_resume = bool(transcript_content)
stream_completed = False
@@ -498,17 +466,12 @@ class TestRetryStateMachine:
for _query_attempt in range(min(_MAX_QUERY_ATTEMPTS, len(attempt_results))):
if _query_attempt > 0:
_last_strategy = _retry_strategy
_retry_strategy = None
_stream_error = None
stream_completed = False
# Plan B or Plan C?
# Plan B only tried once; after that, always Plan C.
if (
_last_strategy == RetryStrategy.COMPACT_THEN_FALLBACK
and transcript_content
and not _compaction_attempted
):
# First retry: try compacting the transcript.
# Subsequent retries: drop transcript, rebuild from DB.
if transcript_content and not _compaction_attempted:
_compaction_attempted = True
if compact_result and compact_result != transcript_content:
use_resume = True
@@ -523,19 +486,19 @@ class TestRetryStateMachine:
result = attempt_results[_query_attempt]
if result == "error":
_retry_strategy = error_strategy
_stream_error = Exception("simulated error")
continue # skip post-stream
# Stream succeeded
stream_completed = True
break
if _retry_strategy is not None:
if _stream_error is not None:
transcript_caused_error = True
return {
"attempts_made": attempts_made,
"retry_strategy": _retry_strategy,
"stream_error": _stream_error,
"transcript_caused_error": transcript_caused_error,
"stream_completed": stream_completed,
"use_resume": use_resume,
@@ -545,7 +508,7 @@ class TestRetryStateMachine:
"""Scenario 1: Success on first attempt."""
state = self._simulate_retry_loop(["success"])
assert state["attempts_made"] == 1
assert state["retry_strategy"] is None
assert state["stream_error"] is None
assert state["transcript_caused_error"] is False
assert state["stream_completed"] is True
assert state["use_resume"] is True
@@ -558,7 +521,7 @@ class TestRetryStateMachine:
compact_result="compacted",
)
assert state["attempts_made"] == 2
assert state["retry_strategy"] is None
assert state["stream_error"] is None
assert state["transcript_caused_error"] is False
assert state["stream_completed"] is True
assert state["use_resume"] is True # compacted transcript used
@@ -571,7 +534,7 @@ class TestRetryStateMachine:
compact_result=None, # compact fails
)
assert state["attempts_made"] == 2
assert state["retry_strategy"] is None
assert state["stream_error"] is None
assert state["transcript_caused_error"] is True # DB fallback
assert state["stream_completed"] is True
assert state["use_resume"] is False
@@ -583,7 +546,7 @@ class TestRetryStateMachine:
transcript_content="", # no transcript
)
assert state["attempts_made"] == 2
assert state["retry_strategy"] is None
assert state["stream_error"] is None
assert state["transcript_caused_error"] is True
assert state["stream_completed"] is True
assert state["use_resume"] is False
@@ -596,7 +559,7 @@ class TestRetryStateMachine:
compact_result="compacted",
)
assert state["attempts_made"] == 3
assert state["retry_strategy"] is None
assert state["stream_error"] is None
assert state["transcript_caused_error"] is True
assert state["stream_completed"] is True
assert state["use_resume"] is False # dropped for attempt 3
@@ -609,7 +572,7 @@ class TestRetryStateMachine:
compact_result="compacted",
)
assert state["attempts_made"] == 3
assert state["retry_strategy"] is not None
assert state["stream_error"] is not None
assert state["transcript_caused_error"] is True
assert state["stream_completed"] is False
@@ -631,22 +594,10 @@ class TestRetryStateMachine:
transcript_content="",
)
assert state["attempts_made"] == 3
assert state["retry_strategy"] is not None
assert state["stream_error"] is not None
assert state["transcript_caused_error"] is True
assert state["stream_completed"] is False
def test_fallback_only_skips_compact(self):
"""FALLBACK_ONLY strategy skips Plan B, goes straight to Plan C."""
state = self._simulate_retry_loop(
["error", "success"],
transcript_content="original",
compact_result="compacted", # would succeed, but should be skipped
error_strategy=RetryStrategy.FALLBACK_ONLY,
)
assert state["attempts_made"] == 2
assert state["transcript_caused_error"] is True
assert state["use_resume"] is False # Plan C, not Plan B
# ---------------------------------------------------------------------------
# Edge cases
@@ -656,37 +607,6 @@ class TestRetryStateMachine:
class TestRetryEdgeCases:
"""Edge cases for the retry logic components."""
def test_classify_error_with_nested_exception(self):
"""Chained exception with transcript error in __cause__ is detected."""
inner = Exception("invalid json in transcript")
outer = RuntimeError("SDK error")
outer.__cause__ = inner
assert _classify_error(outer) == RetryStrategy.FALLBACK_ONLY
assert _classify_error(inner) == RetryStrategy.FALLBACK_ONLY
def test_classify_error_with_context_exception(self):
"""Chained exception via __context__ is detected."""
inner = Exception("json decode error")
outer = RuntimeError("wrapper")
outer.__context__ = inner
assert _classify_error(outer) == RetryStrategy.FALLBACK_ONLY
def test_classify_error_no_infinite_loop(self):
"""Circular exception chain doesn't cause infinite loop."""
a = Exception("error a")
b = Exception("error b")
a.__cause__ = b
b.__cause__ = a # circular
assert _classify_error(a) == RetryStrategy.COMPACT_THEN_FALLBACK
def test_classify_error_case_insensitive(self):
"""Pattern matching must be case-insensitive."""
assert _classify_error(Exception("INVALID JSON")) == RetryStrategy.FALLBACK_ONLY
assert (
_classify_error(Exception("RESUME FILE missing"))
== RetryStrategy.FALLBACK_ONLY
)
@pytest.mark.asyncio
async def test_compact_transcript_with_single_message(self):
"""Single message transcript cannot be compacted."""

View File

@@ -89,56 +89,7 @@ logger = logging.getLogger(__name__)
config = ChatConfig()
class RetryStrategy:
"""Determines what to try next after a streaming error.
COMPACT_THEN_FALLBACK — compact the transcript first; if that also
fails, drop it entirely and rebuild from DB messages.
FALLBACK_ONLY — skip compaction (e.g. the transcript file itself is
broken), go straight to DB-message rebuild.
"""
COMPACT_THEN_FALLBACK = "compact_then_fallback"
FALLBACK_ONLY = "fallback_only"
# Patterns checked against the full exception chain (lowercased).
_TRANSCRIPT_ERROR_PATTERNS = (
"invalid json",
"json decode",
"jsondecodeerror",
"resume file",
"session file",
"malformed jsonl",
)
def _classify_error(err: BaseException) -> str:
"""Classify a streaming error into a :class:`RetryStrategy`.
Walks the exception chain (``__cause__`` / ``__context__``) so that
wrapped errors are detected too.
* Transcript/JSON parse errors → ``FALLBACK_ONLY`` (compaction won't
help if the file itself is broken).
* Everything else (prompt-too-long, transient 500s, timeouts, …) →
``COMPACT_THEN_FALLBACK``.
"""
seen: set[int] = set()
current: BaseException | None = err
err_parts: list[str] = []
while current is not None and id(current) not in seen:
seen.add(id(current))
err_parts.append(str(current).lower())
current = current.__cause__ or current.__context__
combined = " ".join(err_parts)
if any(p in combined for p in _TRANSCRIPT_ERROR_PATTERNS):
return RetryStrategy.FALLBACK_ONLY
return RetryStrategy.COMPACT_THEN_FALLBACK
_MAX_QUERY_ATTEMPTS = 3
async def _retry_with_compacted_transcript(
@@ -1038,32 +989,24 @@ async def stream_chat_completion_sdk(
if attachments.hint:
query_message = f"{query_message}\n\n{attachments.hint}"
_MAX_QUERY_ATTEMPTS = 3
_retry_strategy: str | None = None # set on error, None = no error
_last_stream_error: Exception | None = None
_stream_error: Exception | None = None
_compaction_attempted = False
for _query_attempt in range(_MAX_QUERY_ATTEMPTS):
if _query_attempt > 0:
_last_retry_strategy = _retry_strategy
_retry_strategy = None
_last_stream_error = None
_stream_error = None
stream_completed = False
logger.info(
"%s Retry attempt %d/%d (strategy=%s)",
"%s Retry attempt %d/%d",
log_prefix,
_query_attempt + 1,
_MAX_QUERY_ATTEMPTS,
_last_retry_strategy,
)
# Try compaction first (once); after that, drop transcript.
if (
_last_retry_strategy == RetryStrategy.COMPACT_THEN_FALLBACK
and transcript_content
and not _compaction_attempted
):
# First retry: try compacting the transcript.
# Subsequent retries: drop transcript, rebuild from DB.
if transcript_content and not _compaction_attempted:
_compaction_attempted = True
tb, use_resume, resume_file, success = (
await _retry_with_compacted_transcript(
@@ -1201,14 +1144,12 @@ async def stream_chat_completion_sdk(
)
break
except Exception as stream_err:
_retry_strategy = _classify_error(stream_err)
_last_stream_error = stream_err
_stream_error = stream_err
logger.warning(
"%s Stream error (attempt %d/%d, " "strategy=%s): %s",
"%s Stream error (attempt %d/%d): %s",
log_prefix,
_query_attempt + 1,
_MAX_QUERY_ATTEMPTS,
_retry_strategy,
stream_err,
exc_info=True,
)
@@ -1454,7 +1395,7 @@ async def stream_chat_completion_sdk(
# On error, skip post-stream processing — the retry loop
# will compact / fallback / exhaust attempts. Roll back any
# partial messages appended during the failed attempt.
if _retry_strategy is not None:
if _stream_error is not None:
session.messages = session.messages[:_pre_attempt_msg_count]
continue # next retry attempt
@@ -1528,23 +1469,17 @@ async def stream_chat_completion_sdk(
break
# All retry attempts exhausted — surface error to the user.
if _retry_strategy is not None:
if _stream_error is not None:
transcript_caused_error = True
ended_with_stream_error = True
logger.error(
"%s All %d query attempts exhausted (strategy=%s): %s",
"%s All %d query attempts exhausted: %s",
log_prefix,
_MAX_QUERY_ATTEMPTS,
_retry_strategy,
_last_stream_error,
_stream_error,
)
yield StreamError(
errorText=(
"The conversation is too long for the model. "
"Please start a new session."
if _retry_strategy == RetryStrategy.COMPACT_THEN_FALLBACK
else f"SDK stream error: {_last_stream_error}"
),
errorText=f"SDK stream error: {_stream_error}",
code="all_attempts_exhausted",
)