fix: re-enable intermediate flush for all attempts, add rollback note

The is_final_attempt guard disabled intermediate flush for the common
case (first attempt succeeds, which is 99%+ of turns). Retries only
fire on context-too-long errors with events_yielded==0, meaning the
stream barely started and flush threshold was almost certainly not
reached. Keep flush always enabled and document the theoretical edge
case.
This commit is contained in:
Zamil Majdy
2026-03-30 11:53:33 +00:00
parent df9ae41c25
commit 57401a9b13

View File

@@ -266,7 +266,6 @@ class _RetryState:
adapter: SDKResponseAdapter
transcript_builder: TranscriptBuilder
usage: _TokenUsage
is_final_attempt: bool = False
@dataclass
@@ -1548,11 +1547,9 @@ async def _run_stream_attempt(
# --- Intermediate persistence ---
# Flush session messages to DB periodically so page reloads
# show progress during long-running turns.
# Only flush on the final attempt to avoid persisting messages
# that may be rolled back if a transient error triggers a retry.
_msgs_since_flush += 1
now = time.monotonic()
if state.is_final_attempt and (
if (
_msgs_since_flush >= _FLUSH_MESSAGE_THRESHOLD
or (now - _last_flush_time) >= _FLUSH_INTERVAL_SECONDS
):
@@ -2043,10 +2040,6 @@ async def stream_chat_completion_sdk(
)
for attempt in range(_MAX_STREAM_ATTEMPTS):
# Enable intermediate DB flushing only on the final attempt so
# earlier attempts (which may be rolled back) don't persist
# messages that are later removed from session.messages.
state.is_final_attempt = attempt == _MAX_STREAM_ATTEMPTS - 1
# Clear any stale stash signal from the previous attempt so
# wait_for_stash() doesn't fire prematurely on a leftover event.
reset_stash_event()
@@ -2172,6 +2165,11 @@ async def stream_chat_completion_sdk(
exc_info=True,
)
session.messages = session.messages[:pre_attempt_msg_count]
# Note: if intermediate flushes persisted messages from this
# attempt, those orphans remain in DB. In practice this is
# unlikely because retries only fire on context-too-long errors
# with events_yielded==0 (meaning the stream barely started and
# the flush threshold was almost certainly not reached).
# Cancel any pre-launched tasks from the failed attempt so they
# don't continue executing against the rolled-back session.
await cancel_pending_tool_tasks()