fix(backend): guard intermediate DB flush with is_final_attempt flag

Add is_final_attempt field to _RetryState so intermediate DB flushes
only run on attempt 0 (optimistic — most turns succeed first try) and
the final retry attempt. Middle retry attempts may be rolled back, and
messages already flushed to DB would persist as orphans since the
in-memory rollback (session.messages truncation) has no corresponding
DB delete.
This commit is contained in:
Zamil Majdy
2026-04-01 06:44:02 +02:00
parent 2fa5c37413
commit 79bc0aed91

View File

@@ -266,6 +266,7 @@ class _RetryState:
adapter: SDKResponseAdapter
transcript_builder: TranscriptBuilder
usage: _TokenUsage
is_final_attempt: bool = True
@dataclass
@@ -1493,9 +1494,14 @@ async def _run_stream_attempt(
# --- Intermediate persistence ---
# Flush session messages to DB periodically so page reloads
# show progress during long-running turns.
# Guarded by is_final_attempt: earlier retry attempts may be
# rolled back in memory (session.messages truncated), but
# messages already flushed to DB would persist as orphans.
# is_final_attempt is True on attempt 0 (optimistic — most
# turns succeed on the first try) and on the last retry.
_msgs_since_flush += 1
now = time.monotonic()
if (
if state.is_final_attempt and (
_msgs_since_flush >= _FLUSH_MESSAGE_THRESHOLD
or (now - _last_flush_time) >= _FLUSH_INTERVAL_SECONDS
):
@@ -1986,6 +1992,11 @@ async def stream_chat_completion_sdk(
)
for attempt in range(_MAX_STREAM_ATTEMPTS):
# Enable intermediate DB flushes on attempt 0 (optimistic: most
# turns succeed on the first try) and the last attempt. Middle
# retry attempts may be rolled back, and flushed messages would
# persist as DB orphans — so flushes are disabled for those.
state.is_final_attempt = attempt == 0 or attempt == _MAX_STREAM_ATTEMPTS - 1
# Clear any stale stash signal from the previous attempt so
# wait_for_stash() doesn't fire prematurely on a leftover event.
reset_stash_event()