From 35e92e00ca2ea3f8b26f3473f61bc18ef6b5b4df Mon Sep 17 00:00:00 2001 From: majdyz Date: Tue, 21 Apr 2026 23:49:15 +0700 Subject: [PATCH] feat(platform/copilot): reasoning UI render flag + SSE reconnect storm mitigations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #12871 addressing streaming UX issues observed during Kimi K2.6 rollout testing. Three independent changes: 1. New ChatConfig.render_reasoning_in_ui (default True) gates the StreamReasoning* wire events on BOTH baseline and SDK paths. When False the frontend sees a text-only stream; the model still reasons, tokens are still billed, and role="reasoning" rows are still persisted to session.messages so a future per-session toggle can surface them on reload. 2. SSE reconnect replay cap: ChatConfig.stream_replay_count defaults to 200 (was hard-coded count=1000 in subscribe_to_session.xread). Bounds the replay storm when a tab-switch / browser-throttle fires multiple reconnects. 200 still covers a full Kimi turn after coalescing (~150 events). 3. Frontend reconnect debounce: handleReconnect rejects requests that arrive within 1500ms of the last reconnect's resume, so visibility- throttle bursts collapse to one GET /stream instead of 2-3. Scope 3 from the brief (Last-Event-ID SSE resume) is deferred to a second follow-up — threading the redis stream id through every to_sse() and swapping the fetch-based DefaultChatTransport for a Last-Event-ID-aware client is a deeper architectural change than scopes 1+2 combined. Tests: 1140 copilot backend tests pass. New coverage: BaselineReasoningEmitter render_in_ui flag (wire suppression + persistence preservation), SDKResponseAdapter render_reasoning_in_ui flag, ChatConfig regression tests for both new fields. --- .../backend/copilot/baseline/reasoning.py | 34 +++++++++-- .../copilot/baseline/reasoning_test.py | 53 +++++++++++++++++ .../backend/copilot/baseline/service.py | 8 ++- .../backend/backend/copilot/config.py | 27 +++++++++ .../backend/backend/copilot/config_test.py | 37 ++++++++++++ .../backend/copilot/sdk/p0_guardrails_test.py | 6 +- .../backend/copilot/sdk/response_adapter.py | 46 ++++++++++++--- .../copilot/sdk/response_adapter_test.py | 58 +++++++++++++++++++ .../backend/backend/copilot/sdk/service.py | 16 ++++- .../backend/copilot/stream_registry.py | 12 +++- .../(platform)/copilot/useCopilotStream.ts | 32 ++++++++++ 11 files changed, 311 insertions(+), 18 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py index 15a77dde8a..30dceb46de 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py @@ -172,16 +172,28 @@ class BaselineReasoningEmitter: fresh ``ChatMessage(role="reasoning")`` is appended and mutated in-place as further deltas arrive; :meth:`close` drops the reference but leaves the appended row intact. + + Pass ``render_in_ui=False`` to suppress the ``StreamReasoning*`` wire + events — the emitter still reads the reasoning payload and still + appends / mutates the persisted ``role="reasoning"`` row, but returns + empty event lists from :meth:`on_delta` and :meth:`close`. This lets + the operator (via :attr:`ChatConfig.render_reasoning_in_ui`) hide the + reasoning collapse from the live stream without dropping the audit + trail; a future per-session toggle can surface the persisted rows on + reload. """ def __init__( self, session_messages: list[ChatMessage] | None = None, + *, + render_in_ui: bool = True, ) -> None: self._block_id: str = str(uuid.uuid4()) self._open: bool = False self._session_messages = session_messages self._current_row: ChatMessage | None = None + self._render_in_ui = render_in_ui @property def is_open(self) -> bool: @@ -195,6 +207,11 @@ class BaselineReasoningEmitter: Persistence (when a session message list is attached) happens in lockstep with emission so the row's content stays equal to the concatenated deltas at every delta boundary. + + When ``render_in_ui=False`` the wire events are suppressed (empty + list returned) but the state machine still advances and the + persistence row is still appended / mutated, so ``close`` behaves + symmetrically and the persisted audit trail is identical. """ ext = OpenRouterDeltaExtension.from_delta(delta) text = ext.visible_text() @@ -202,12 +219,14 @@ class BaselineReasoningEmitter: return [] events: list[StreamBaseResponse] = [] if not self._open: - events.append(StreamReasoningStart(id=self._block_id)) + if self._render_in_ui: + events.append(StreamReasoningStart(id=self._block_id)) self._open = True if self._session_messages is not None: self._current_row = ChatMessage(role="reasoning", content="") self._session_messages.append(self._current_row) - events.append(StreamReasoningDelta(id=self._block_id, delta=text)) + if self._render_in_ui: + events.append(StreamReasoningDelta(id=self._block_id, delta=text)) if self._current_row is not None: self._current_row.content = (self._current_row.content or "") + text return events @@ -220,11 +239,18 @@ class BaselineReasoningEmitter: than reusing one already closed on the wire. The persisted row is not removed — it stays in ``session_messages`` as the durable record of what was reasoned. + + When ``render_in_ui=False`` the wire event is suppressed (empty + list returned) but the block still rotates so subsequent reasoning + uses a fresh id — keeps the state machine in lockstep with the + render-on branch. """ if not self._open: return [] - event = StreamReasoningEnd(id=self._block_id) + events: list[StreamBaseResponse] = ( + [StreamReasoningEnd(id=self._block_id)] if self._render_in_ui else [] + ) self._open = False self._block_id = str(uuid.uuid4()) self._current_row = None - return [event] + return events diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py index df64086d5f..488b25213c 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py @@ -279,3 +279,56 @@ class TestReasoningPersistence: events = emitter.on_delta(_delta(reasoning="pure wire")) assert len(events) == 2 # start + delta, no crash # Nothing else to assert — just proves None session is supported. + + +class TestBaselineReasoningEmitterRenderFlag: + """``render_in_ui=False`` must silence ``StreamReasoning*`` wire events + while keeping persistence intact — the operator can hide the collapse + without losing the audit trail. These tests pin the contract in both + directions so future refactors can't flip only one half.""" + + def test_render_off_suppresses_start_and_delta(self): + emitter = BaselineReasoningEmitter(render_in_ui=False) + events = emitter.on_delta(_delta(reasoning="hidden")) + # No wire events, but state advanced (is_open == True) so close() + # below has something to rotate. + assert events == [] + assert emitter.is_open is True + + def test_render_off_suppresses_close_end(self): + emitter = BaselineReasoningEmitter(render_in_ui=False) + emitter.on_delta(_delta(reasoning="hidden")) + events = emitter.close() + assert events == [] + assert emitter.is_open is False + + def test_render_off_still_persists_rows(self): + session: list[ChatMessage] = [] + emitter = BaselineReasoningEmitter(session, render_in_ui=False) + + emitter.on_delta(_delta(reasoning="part one ")) + emitter.on_delta(_delta(reasoning="part two")) + emitter.close() + + assert len(session) == 1 + assert session[0].role == "reasoning" + assert session[0].content == "part one part two" + + def test_render_off_rotates_block_id_between_sessions(self): + """Even with wire events silenced the block id must rotate on close, + otherwise a hypothetical mid-session flip would reuse a stale id.""" + emitter = BaselineReasoningEmitter(render_in_ui=False) + emitter.on_delta(_delta(reasoning="first")) + first_block_id = emitter._block_id + emitter.close() + emitter.on_delta(_delta(reasoning="second")) + assert emitter._block_id != first_block_id + + def test_render_on_is_default(self): + """Defaulting to True preserves backward compat — existing callers + that don't pass the kwarg keep emitting wire events as before.""" + emitter = BaselineReasoningEmitter() + events = emitter.on_delta(_delta(reasoning="hello")) + assert len(events) == 2 + assert isinstance(events[0], StreamReasoningStart) + assert isinstance(events[1], StreamReasoningDelta) diff --git a/autogpt_platform/backend/backend/copilot/baseline/service.py b/autogpt_platform/backend/backend/copilot/baseline/service.py index 474a6834b1..0db2edb533 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/service.py +++ b/autogpt_platform/backend/backend/copilot/baseline/service.py @@ -379,7 +379,13 @@ class _BaselineStreamState: # frontend's ``convertChatSessionToUiMessages`` relies on these # rows to render the Reasoning collapse after the AI SDK's # stream-end hydrate swaps in the DB-backed message list. - self.reasoning_emitter = BaselineReasoningEmitter(self.session_messages) + # ``render_in_ui`` is sourced from ``config.render_reasoning_in_ui`` + # so the operator can silence the reasoning collapse globally + # without dropping the persisted audit trail. + self.reasoning_emitter = BaselineReasoningEmitter( + self.session_messages, + render_in_ui=config.render_reasoning_in_ui, + ) def _is_anthropic_model(model: str) -> bool: diff --git a/autogpt_platform/backend/backend/copilot/config.py b/autogpt_platform/backend/backend/copilot/config.py index 1bb63fe1da..b13f908e0b 100644 --- a/autogpt_platform/backend/backend/copilot/config.py +++ b/autogpt_platform/backend/backend/copilot/config.py @@ -205,6 +205,33 @@ class ChatConfig(BaseSettings): "``max_thinking_tokens`` kwarg so the CLI falls back to model default " "(which, without the flag, leaves extended thinking off).", ) + render_reasoning_in_ui: bool = Field( + default=True, + description="Render extended-thinking reasoning as live UI parts " + "(``reasoning-start``/``reasoning-delta``/``reasoning-end``) on the " + "wire. When False, the baseline emitter and the SDK response adapter " + "suppress the three reasoning wire events — the model still reasons " + "and tokens are still billed, but the frontend sees a text-only " + "stream. Reasoning rows are still persisted to ``session.messages`` " + "as ``role='reasoning'`` entries, so a future per-session toggle can " + "surface them on reload. Flip to False to silence the reasoning " + "collapse without dropping the persisted audit trail.", + ) + stream_replay_count: int = Field( + default=200, + ge=1, + le=10000, + description="Maximum number of Redis stream entries replayed on SSE " + "reconnect in :func:`stream_registry.subscribe_to_session`. Lowered " + "from 1000 to bound the replay storm when a tab-switch / throttle " + "triggers multiple quick reconnects — 200 still covers a full Kimi " + "turn after coalescing (~150 events) and leaves headroom for long " + "Opus tool loops. Redis ``XREAD`` ``count`` caps the *response size*, " + "not the total events that existed; dropping older entries on replay " + "is acceptable because the frontend deduplicates on block ids. " + "Scoped to the SSE resume path only — live stream listener XREAD " + "(no replay) is unchanged.", + ) claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = ( Field( default=None, diff --git a/autogpt_platform/backend/backend/copilot/config_test.py b/autogpt_platform/backend/backend/copilot/config_test.py index fe8e67b7ff..25f9f477f2 100644 --- a/autogpt_platform/backend/backend/copilot/config_test.py +++ b/autogpt_platform/backend/backend/copilot/config_test.py @@ -19,6 +19,8 @@ _ENV_VARS_TO_CLEAR = ( "OPENAI_BASE_URL", "CHAT_CLAUDE_AGENT_CLI_PATH", "CLAUDE_AGENT_CLI_PATH", + "CHAT_RENDER_REASONING_IN_UI", + "CHAT_STREAM_REPLAY_COUNT", ) @@ -164,3 +166,38 @@ class TestClaudeAgentCliPathEnvFallback: monkeypatch.setenv("CLAUDE_AGENT_CLI_PATH", str(tmp_path)) with pytest.raises(Exception, match="not a regular file"): ChatConfig() + + +class TestRenderReasoningInUi: + """``render_reasoning_in_ui`` gates reasoning wire events globally.""" + + def test_defaults_to_true(self): + """Default must stay True — flipping it silences the reasoning + collapse for every user, which is an opt-in operator decision.""" + cfg = ChatConfig() + assert cfg.render_reasoning_in_ui is True + + def test_env_override_false(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CHAT_RENDER_REASONING_IN_UI", "false") + cfg = ChatConfig() + assert cfg.render_reasoning_in_ui is False + + +class TestStreamReplayCount: + """``stream_replay_count`` caps the SSE reconnect replay batch size.""" + + def test_default_is_200(self): + """200 covers a full Kimi turn after coalescing (~150 events) while + bounding the replay storm from 1000+ chunks.""" + cfg = ChatConfig() + assert cfg.stream_replay_count == 200 + + def test_env_override(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CHAT_STREAM_REPLAY_COUNT", "500") + cfg = ChatConfig() + assert cfg.stream_replay_count == 500 + + def test_zero_rejected(self): + """count=0 would make XREAD replay nothing — rejected via ge=1.""" + with pytest.raises(Exception): + ChatConfig(stream_replay_count=0) diff --git a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py index 17b54797b8..e38fdddd9a 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py @@ -733,7 +733,11 @@ class TestDoTransientBackoff: async for _ in _do_transient_backoff(3, state, "msg-1", "sess-1"): pass - mock_cls.assert_called_once_with(message_id="msg-1", session_id="sess-1") + mock_cls.assert_called_once_with( + message_id="msg-1", + session_id="sess-1", + render_reasoning_in_ui=True, + ) assert state.adapter is new_adapter async def test_resets_usage_after_yield(self): diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py index fbd73d9277..8c06daa9a5 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py @@ -53,7 +53,13 @@ class SDKResponseAdapter: text blocks, tool calls, and message lifecycle. """ - def __init__(self, message_id: str | None = None, session_id: str | None = None): + def __init__( + self, + message_id: str | None = None, + session_id: str | None = None, + *, + render_reasoning_in_ui: bool = True, + ): self.message_id = message_id or str(uuid.uuid4()) self.session_id = session_id self.text_block_id = str(uuid.uuid4()) @@ -62,6 +68,13 @@ class SDKResponseAdapter: self.reasoning_block_id = str(uuid.uuid4()) self.has_started_reasoning = False self.has_ended_reasoning = True + # When False, ``_ensure_reasoning_started`` / ``StreamReasoningDelta`` + # emission / ``_end_reasoning_if_open`` become no-ops so the frontend + # sees a text-only stream for a ``ThinkingBlock``. The block is + # still persisted to ``session.messages`` via + # ``_format_sdk_content_blocks`` — same rule as the baseline + # emitter: operator silences the live collapse, audit trail stays. + self._render_reasoning_in_ui = render_reasoning_in_ui self.current_tool_calls: dict[str, dict[str, str]] = {} self.resolved_tool_calls: set[str] = set() self.step_open = False @@ -142,15 +155,22 @@ class SDKResponseAdapter: # it live, extended_thinking turns that end # thinking-only left the UI stuck on "Thought for Xs" # with nothing rendered until a page refresh. + # + # When ``render_reasoning_in_ui=False`` the three + # reasoning helpers below (and the append) no-op, so + # the frontend sees a text-only stream. Persistence + # of the thinking text (``_format_sdk_content_blocks``) + # is unaffected. if block.thinking: self._end_text_if_open(responses) self._ensure_reasoning_started(responses) - responses.append( - StreamReasoningDelta( - id=self.reasoning_block_id, - delta=block.thinking, + if self._render_reasoning_in_ui: + responses.append( + StreamReasoningDelta( + id=self.reasoning_block_id, + delta=block.thinking, + ) ) - ) elif isinstance(block, ToolUseBlock): self._end_text_if_open(responses) @@ -349,7 +369,13 @@ class SDKResponseAdapter: Each ``ThinkingBlock`` the SDK emits gets its own streaming block on the wire so the frontend can render a new ``Reasoning`` part per LLM turn (rather than concatenating across the whole session). + + No-op when ``render_reasoning_in_ui=False`` — callers still drive + the method on every ``ThinkingBlock`` so persistence stays in + lockstep, but nothing reaches the wire. """ + if not self._render_reasoning_in_ui: + return if not self.has_started_reasoning or self.has_ended_reasoning: if self.has_ended_reasoning: self.reasoning_block_id = str(uuid.uuid4()) @@ -358,7 +384,13 @@ class SDKResponseAdapter: self.has_started_reasoning = True def _end_reasoning_if_open(self, responses: list[StreamBaseResponse]) -> None: - """End the current reasoning block if one is open.""" + """End the current reasoning block if one is open. + + No-op when ``render_reasoning_in_ui=False`` — no start was emitted, + so no end is needed. + """ + if not self._render_reasoning_in_ui: + return if self.has_started_reasoning and not self.has_ended_reasoning: responses.append(StreamReasoningEnd(id=self.reasoning_block_id)) self.has_ended_reasoning = True diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py index 634454f9e5..ada472561a 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py @@ -331,6 +331,64 @@ def test_empty_thinking_block_is_ignored(): assert [type(r).__name__ for r in results] == ["StreamStartStep"] +def test_render_reasoning_in_ui_false_suppresses_thinking_events(): + """``render_reasoning_in_ui=False`` silences ``StreamReasoning*`` on + the wire — the frontend sees a text-only stream. Persistence via + ``_format_sdk_content_blocks`` is handled elsewhere; this test only + pins the wire contract. + """ + adapter = SDKResponseAdapter( + message_id="m", + session_id="s", + render_reasoning_in_ui=False, + ) + msg = AssistantMessage( + content=[ThinkingBlock(thinking="plan", signature="sig")], + model="test", + ) + results = adapter.convert_message(msg) + types = [type(r).__name__ for r in results] + assert "StreamReasoningStart" not in types + assert "StreamReasoningDelta" not in types + assert "StreamReasoningEnd" not in types + + +def test_render_reasoning_off_text_after_thinking_emits_no_reasoning_end(): + """With rendering off the ReasoningEnd is never synthesized when text + follows — no ReasoningStart ever hit the wire, so no close is due.""" + adapter = SDKResponseAdapter( + message_id="m", + session_id="s", + render_reasoning_in_ui=False, + ) + adapter.convert_message( + AssistantMessage( + content=[ThinkingBlock(thinking="warming up", signature="sig")], + model="test", + ) + ) + results = adapter.convert_message( + AssistantMessage(content=[TextBlock(text="hello")], model="test") + ) + types = [type(r).__name__ for r in results] + assert "StreamReasoningEnd" not in types + assert "StreamTextStart" in types + assert "StreamTextDelta" in types + + +def test_render_reasoning_on_is_default(): + """Default is True — existing callers keep emitting reasoning events.""" + adapter = SDKResponseAdapter(message_id="m", session_id="s") + msg = AssistantMessage( + content=[ThinkingBlock(thinking="plan", signature="sig")], + model="test", + ) + results = adapter.convert_message(msg) + types = [type(r).__name__ for r in results] + assert "StreamReasoningStart" in types + assert "StreamReasoningDelta" in types + + def test_result_success_synthesizes_fallback_text_when_final_turn_is_thinking_only(): """If the model's last LLM call after a tool_result produced only a ThinkingBlock (no TextBlock), the UI would hang on the tool output diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index 6c7493c045..5d5f10ebb5 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -821,7 +821,11 @@ async def _do_transient_backoff( """ yield StreamStatus(message=f"Connection interrupted, retrying in {backoff}s…") await asyncio.sleep(backoff) - state.adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id) + state.adapter = SDKResponseAdapter( + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) state.usage.reset() @@ -3155,7 +3159,11 @@ async def stream_chat_completion_sdk( options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] # dynamic kwargs - adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id) + adapter = SDKResponseAdapter( + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) # Propagate user_id/session_id as OTEL context attributes so the # langsmith tracing integration attaches them to every span. This @@ -3489,7 +3497,9 @@ async def stream_chat_completion_sdk( session, user_id, is_user_message, state.query_message ) state.adapter = SDKResponseAdapter( - message_id=message_id, session_id=session_id + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, ) # Reset token accumulators so a failed attempt's partial # usage is not double-counted in the successful attempt. diff --git a/autogpt_platform/backend/backend/copilot/stream_registry.py b/autogpt_platform/backend/backend/copilot/stream_registry.py index 424964e075..c2dfb86fdb 100644 --- a/autogpt_platform/backend/backend/copilot/stream_registry.py +++ b/autogpt_platform/backend/backend/copilot/stream_registry.py @@ -485,9 +485,17 @@ async def subscribe_to_session( subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue() stream_key = _get_turn_stream_key(session.turn_id) - # Step 1: Replay messages from Redis Stream + # Step 1: Replay messages from Redis Stream. + # ``count`` caps the replay batch size — lowered from a hard-coded 1000 + # to :attr:`ChatConfig.stream_replay_count` (default 200) to bound the + # replay storm when a tab-switch / browser throttle triggers multiple + # reconnects in quick succession. 200 still covers a full Kimi turn + # after coalescing (~150 events); dropping older entries on replay is + # acceptable because the frontend deduplicates on block ids. xread_start = time.perf_counter() - messages = await redis.xread({stream_key: last_message_id}, block=None, count=1000) + messages = await redis.xread( + {stream_key: last_message_id}, block=None, count=config.stream_replay_count + ) xread_time = (time.perf_counter() - xread_start) * 1000 logger.info( f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={session_status}", diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts index 2412ff5988..4f3f899b68 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts @@ -25,6 +25,18 @@ import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd"; const RECONNECT_BASE_DELAY_MS = 1_000; const RECONNECT_MAX_ATTEMPTS = 3; +/** + * Minimum spacing between successive reconnect attempts. + * `isReconnectScheduledRef` already prevents OVERLAPPING reconnects, but + * tab-throttle / visibility wake bursts can fire `onFinish(isDisconnect)` + * several times inside a single second — each one would schedule a fresh + * reconnect the moment the previous timer cleared the ref. The debounce + * rejects any reconnect request that arrives within this window since the + * last one's resume actually ran, so a throttle storm collapses to one + * GET /stream instead of three. + */ +const RECONNECT_DEBOUNCE_MS = 1_500; + /** Minimum time the page must have been hidden to trigger a wake re-sync. */ const WAKE_RESYNC_THRESHOLD_MS = 30_000; @@ -110,6 +122,11 @@ export function useCopilotStream({ const isReconnectScheduledRef = useRef(false); const [isReconnectScheduled, setIsReconnectScheduled] = useState(false); const reconnectTimerRef = useRef>(); + // Timestamp of the last reconnect's actual resume call — used together + // with RECONNECT_DEBOUNCE_MS to drop rapid duplicate reconnect requests + // (e.g. visibility throttle firing onFinish(isDisconnect) several times + // in the same second). 0 = no reconnect has fired yet in this session. + const lastReconnectResumeAtRef = useRef(0); const hasShownDisconnectToast = useRef(false); // Set when the user explicitly clicks stop — prevents onError from // triggering a reconnect cycle for the resulting AbortError. @@ -127,6 +144,19 @@ export function useCopilotStream({ function handleReconnect(sid: string) { if (isReconnectScheduledRef.current || !sid) return; + // Debounce: if the previous reconnect resumed within the last + // RECONNECT_DEBOUNCE_MS, drop this request. Browser tab-throttle bursts + // can fire onFinish(isDisconnect) 2–3 times in a second; without this + // guard each fires its own GET /stream, each one replays the Redis + // stream, and the flicker storm is back. + const sinceLastResume = Date.now() - lastReconnectResumeAtRef.current; + if ( + lastReconnectResumeAtRef.current > 0 && + sinceLastResume < RECONNECT_DEBOUNCE_MS + ) { + return; + } + const nextAttempt = reconnectAttemptsRef.current + 1; if (nextAttempt > RECONNECT_MAX_ATTEMPTS) { setReconnectExhausted(true); @@ -163,6 +193,7 @@ export function useCopilotStream({ } return prev; }); + lastReconnectResumeAtRef.current = Date.now(); resumeStreamRef.current(); }, delay); } @@ -469,6 +500,7 @@ export function useCopilotStream({ setRateLimitMessage(null); hasShownDisconnectToast.current = false; lastSubmittedMsgRef.current = null; + lastReconnectResumeAtRef.current = 0; setReconnectExhausted(false); setIsSyncing(false); hasResumedRef.current.clear();