From 35e92e00ca2ea3f8b26f3473f61bc18ef6b5b4df Mon Sep 17 00:00:00 2001 From: majdyz Date: Tue, 21 Apr 2026 23:49:15 +0700 Subject: [PATCH 1/5] feat(platform/copilot): reasoning UI render flag + SSE reconnect storm mitigations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #12871 addressing streaming UX issues observed during Kimi K2.6 rollout testing. Three independent changes: 1. New ChatConfig.render_reasoning_in_ui (default True) gates the StreamReasoning* wire events on BOTH baseline and SDK paths. When False the frontend sees a text-only stream; the model still reasons, tokens are still billed, and role="reasoning" rows are still persisted to session.messages so a future per-session toggle can surface them on reload. 2. SSE reconnect replay cap: ChatConfig.stream_replay_count defaults to 200 (was hard-coded count=1000 in subscribe_to_session.xread). Bounds the replay storm when a tab-switch / browser-throttle fires multiple reconnects. 200 still covers a full Kimi turn after coalescing (~150 events). 3. Frontend reconnect debounce: handleReconnect rejects requests that arrive within 1500ms of the last reconnect's resume, so visibility- throttle bursts collapse to one GET /stream instead of 2-3. Scope 3 from the brief (Last-Event-ID SSE resume) is deferred to a second follow-up — threading the redis stream id through every to_sse() and swapping the fetch-based DefaultChatTransport for a Last-Event-ID-aware client is a deeper architectural change than scopes 1+2 combined. Tests: 1140 copilot backend tests pass. New coverage: BaselineReasoningEmitter render_in_ui flag (wire suppression + persistence preservation), SDKResponseAdapter render_reasoning_in_ui flag, ChatConfig regression tests for both new fields. --- .../backend/copilot/baseline/reasoning.py | 34 +++++++++-- .../copilot/baseline/reasoning_test.py | 53 +++++++++++++++++ .../backend/copilot/baseline/service.py | 8 ++- .../backend/backend/copilot/config.py | 27 +++++++++ .../backend/backend/copilot/config_test.py | 37 ++++++++++++ .../backend/copilot/sdk/p0_guardrails_test.py | 6 +- .../backend/copilot/sdk/response_adapter.py | 46 ++++++++++++--- .../copilot/sdk/response_adapter_test.py | 58 +++++++++++++++++++ .../backend/backend/copilot/sdk/service.py | 16 ++++- .../backend/copilot/stream_registry.py | 12 +++- .../(platform)/copilot/useCopilotStream.ts | 32 ++++++++++ 11 files changed, 311 insertions(+), 18 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py index 15a77dde8a..30dceb46de 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py @@ -172,16 +172,28 @@ class BaselineReasoningEmitter: fresh ``ChatMessage(role="reasoning")`` is appended and mutated in-place as further deltas arrive; :meth:`close` drops the reference but leaves the appended row intact. + + Pass ``render_in_ui=False`` to suppress the ``StreamReasoning*`` wire + events — the emitter still reads the reasoning payload and still + appends / mutates the persisted ``role="reasoning"`` row, but returns + empty event lists from :meth:`on_delta` and :meth:`close`. This lets + the operator (via :attr:`ChatConfig.render_reasoning_in_ui`) hide the + reasoning collapse from the live stream without dropping the audit + trail; a future per-session toggle can surface the persisted rows on + reload. """ def __init__( self, session_messages: list[ChatMessage] | None = None, + *, + render_in_ui: bool = True, ) -> None: self._block_id: str = str(uuid.uuid4()) self._open: bool = False self._session_messages = session_messages self._current_row: ChatMessage | None = None + self._render_in_ui = render_in_ui @property def is_open(self) -> bool: @@ -195,6 +207,11 @@ class BaselineReasoningEmitter: Persistence (when a session message list is attached) happens in lockstep with emission so the row's content stays equal to the concatenated deltas at every delta boundary. + + When ``render_in_ui=False`` the wire events are suppressed (empty + list returned) but the state machine still advances and the + persistence row is still appended / mutated, so ``close`` behaves + symmetrically and the persisted audit trail is identical. """ ext = OpenRouterDeltaExtension.from_delta(delta) text = ext.visible_text() @@ -202,12 +219,14 @@ class BaselineReasoningEmitter: return [] events: list[StreamBaseResponse] = [] if not self._open: - events.append(StreamReasoningStart(id=self._block_id)) + if self._render_in_ui: + events.append(StreamReasoningStart(id=self._block_id)) self._open = True if self._session_messages is not None: self._current_row = ChatMessage(role="reasoning", content="") self._session_messages.append(self._current_row) - events.append(StreamReasoningDelta(id=self._block_id, delta=text)) + if self._render_in_ui: + events.append(StreamReasoningDelta(id=self._block_id, delta=text)) if self._current_row is not None: self._current_row.content = (self._current_row.content or "") + text return events @@ -220,11 +239,18 @@ class BaselineReasoningEmitter: than reusing one already closed on the wire. The persisted row is not removed — it stays in ``session_messages`` as the durable record of what was reasoned. + + When ``render_in_ui=False`` the wire event is suppressed (empty + list returned) but the block still rotates so subsequent reasoning + uses a fresh id — keeps the state machine in lockstep with the + render-on branch. """ if not self._open: return [] - event = StreamReasoningEnd(id=self._block_id) + events: list[StreamBaseResponse] = ( + [StreamReasoningEnd(id=self._block_id)] if self._render_in_ui else [] + ) self._open = False self._block_id = str(uuid.uuid4()) self._current_row = None - return [event] + return events diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py index df64086d5f..488b25213c 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py @@ -279,3 +279,56 @@ class TestReasoningPersistence: events = emitter.on_delta(_delta(reasoning="pure wire")) assert len(events) == 2 # start + delta, no crash # Nothing else to assert — just proves None session is supported. + + +class TestBaselineReasoningEmitterRenderFlag: + """``render_in_ui=False`` must silence ``StreamReasoning*`` wire events + while keeping persistence intact — the operator can hide the collapse + without losing the audit trail. These tests pin the contract in both + directions so future refactors can't flip only one half.""" + + def test_render_off_suppresses_start_and_delta(self): + emitter = BaselineReasoningEmitter(render_in_ui=False) + events = emitter.on_delta(_delta(reasoning="hidden")) + # No wire events, but state advanced (is_open == True) so close() + # below has something to rotate. + assert events == [] + assert emitter.is_open is True + + def test_render_off_suppresses_close_end(self): + emitter = BaselineReasoningEmitter(render_in_ui=False) + emitter.on_delta(_delta(reasoning="hidden")) + events = emitter.close() + assert events == [] + assert emitter.is_open is False + + def test_render_off_still_persists_rows(self): + session: list[ChatMessage] = [] + emitter = BaselineReasoningEmitter(session, render_in_ui=False) + + emitter.on_delta(_delta(reasoning="part one ")) + emitter.on_delta(_delta(reasoning="part two")) + emitter.close() + + assert len(session) == 1 + assert session[0].role == "reasoning" + assert session[0].content == "part one part two" + + def test_render_off_rotates_block_id_between_sessions(self): + """Even with wire events silenced the block id must rotate on close, + otherwise a hypothetical mid-session flip would reuse a stale id.""" + emitter = BaselineReasoningEmitter(render_in_ui=False) + emitter.on_delta(_delta(reasoning="first")) + first_block_id = emitter._block_id + emitter.close() + emitter.on_delta(_delta(reasoning="second")) + assert emitter._block_id != first_block_id + + def test_render_on_is_default(self): + """Defaulting to True preserves backward compat — existing callers + that don't pass the kwarg keep emitting wire events as before.""" + emitter = BaselineReasoningEmitter() + events = emitter.on_delta(_delta(reasoning="hello")) + assert len(events) == 2 + assert isinstance(events[0], StreamReasoningStart) + assert isinstance(events[1], StreamReasoningDelta) diff --git a/autogpt_platform/backend/backend/copilot/baseline/service.py b/autogpt_platform/backend/backend/copilot/baseline/service.py index 474a6834b1..0db2edb533 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/service.py +++ b/autogpt_platform/backend/backend/copilot/baseline/service.py @@ -379,7 +379,13 @@ class _BaselineStreamState: # frontend's ``convertChatSessionToUiMessages`` relies on these # rows to render the Reasoning collapse after the AI SDK's # stream-end hydrate swaps in the DB-backed message list. - self.reasoning_emitter = BaselineReasoningEmitter(self.session_messages) + # ``render_in_ui`` is sourced from ``config.render_reasoning_in_ui`` + # so the operator can silence the reasoning collapse globally + # without dropping the persisted audit trail. + self.reasoning_emitter = BaselineReasoningEmitter( + self.session_messages, + render_in_ui=config.render_reasoning_in_ui, + ) def _is_anthropic_model(model: str) -> bool: diff --git a/autogpt_platform/backend/backend/copilot/config.py b/autogpt_platform/backend/backend/copilot/config.py index 1bb63fe1da..b13f908e0b 100644 --- a/autogpt_platform/backend/backend/copilot/config.py +++ b/autogpt_platform/backend/backend/copilot/config.py @@ -205,6 +205,33 @@ class ChatConfig(BaseSettings): "``max_thinking_tokens`` kwarg so the CLI falls back to model default " "(which, without the flag, leaves extended thinking off).", ) + render_reasoning_in_ui: bool = Field( + default=True, + description="Render extended-thinking reasoning as live UI parts " + "(``reasoning-start``/``reasoning-delta``/``reasoning-end``) on the " + "wire. When False, the baseline emitter and the SDK response adapter " + "suppress the three reasoning wire events — the model still reasons " + "and tokens are still billed, but the frontend sees a text-only " + "stream. Reasoning rows are still persisted to ``session.messages`` " + "as ``role='reasoning'`` entries, so a future per-session toggle can " + "surface them on reload. Flip to False to silence the reasoning " + "collapse without dropping the persisted audit trail.", + ) + stream_replay_count: int = Field( + default=200, + ge=1, + le=10000, + description="Maximum number of Redis stream entries replayed on SSE " + "reconnect in :func:`stream_registry.subscribe_to_session`. Lowered " + "from 1000 to bound the replay storm when a tab-switch / throttle " + "triggers multiple quick reconnects — 200 still covers a full Kimi " + "turn after coalescing (~150 events) and leaves headroom for long " + "Opus tool loops. Redis ``XREAD`` ``count`` caps the *response size*, " + "not the total events that existed; dropping older entries on replay " + "is acceptable because the frontend deduplicates on block ids. " + "Scoped to the SSE resume path only — live stream listener XREAD " + "(no replay) is unchanged.", + ) claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = ( Field( default=None, diff --git a/autogpt_platform/backend/backend/copilot/config_test.py b/autogpt_platform/backend/backend/copilot/config_test.py index fe8e67b7ff..25f9f477f2 100644 --- a/autogpt_platform/backend/backend/copilot/config_test.py +++ b/autogpt_platform/backend/backend/copilot/config_test.py @@ -19,6 +19,8 @@ _ENV_VARS_TO_CLEAR = ( "OPENAI_BASE_URL", "CHAT_CLAUDE_AGENT_CLI_PATH", "CLAUDE_AGENT_CLI_PATH", + "CHAT_RENDER_REASONING_IN_UI", + "CHAT_STREAM_REPLAY_COUNT", ) @@ -164,3 +166,38 @@ class TestClaudeAgentCliPathEnvFallback: monkeypatch.setenv("CLAUDE_AGENT_CLI_PATH", str(tmp_path)) with pytest.raises(Exception, match="not a regular file"): ChatConfig() + + +class TestRenderReasoningInUi: + """``render_reasoning_in_ui`` gates reasoning wire events globally.""" + + def test_defaults_to_true(self): + """Default must stay True — flipping it silences the reasoning + collapse for every user, which is an opt-in operator decision.""" + cfg = ChatConfig() + assert cfg.render_reasoning_in_ui is True + + def test_env_override_false(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CHAT_RENDER_REASONING_IN_UI", "false") + cfg = ChatConfig() + assert cfg.render_reasoning_in_ui is False + + +class TestStreamReplayCount: + """``stream_replay_count`` caps the SSE reconnect replay batch size.""" + + def test_default_is_200(self): + """200 covers a full Kimi turn after coalescing (~150 events) while + bounding the replay storm from 1000+ chunks.""" + cfg = ChatConfig() + assert cfg.stream_replay_count == 200 + + def test_env_override(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CHAT_STREAM_REPLAY_COUNT", "500") + cfg = ChatConfig() + assert cfg.stream_replay_count == 500 + + def test_zero_rejected(self): + """count=0 would make XREAD replay nothing — rejected via ge=1.""" + with pytest.raises(Exception): + ChatConfig(stream_replay_count=0) diff --git a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py index 17b54797b8..e38fdddd9a 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py @@ -733,7 +733,11 @@ class TestDoTransientBackoff: async for _ in _do_transient_backoff(3, state, "msg-1", "sess-1"): pass - mock_cls.assert_called_once_with(message_id="msg-1", session_id="sess-1") + mock_cls.assert_called_once_with( + message_id="msg-1", + session_id="sess-1", + render_reasoning_in_ui=True, + ) assert state.adapter is new_adapter async def test_resets_usage_after_yield(self): diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py index fbd73d9277..8c06daa9a5 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py @@ -53,7 +53,13 @@ class SDKResponseAdapter: text blocks, tool calls, and message lifecycle. """ - def __init__(self, message_id: str | None = None, session_id: str | None = None): + def __init__( + self, + message_id: str | None = None, + session_id: str | None = None, + *, + render_reasoning_in_ui: bool = True, + ): self.message_id = message_id or str(uuid.uuid4()) self.session_id = session_id self.text_block_id = str(uuid.uuid4()) @@ -62,6 +68,13 @@ class SDKResponseAdapter: self.reasoning_block_id = str(uuid.uuid4()) self.has_started_reasoning = False self.has_ended_reasoning = True + # When False, ``_ensure_reasoning_started`` / ``StreamReasoningDelta`` + # emission / ``_end_reasoning_if_open`` become no-ops so the frontend + # sees a text-only stream for a ``ThinkingBlock``. The block is + # still persisted to ``session.messages`` via + # ``_format_sdk_content_blocks`` — same rule as the baseline + # emitter: operator silences the live collapse, audit trail stays. + self._render_reasoning_in_ui = render_reasoning_in_ui self.current_tool_calls: dict[str, dict[str, str]] = {} self.resolved_tool_calls: set[str] = set() self.step_open = False @@ -142,15 +155,22 @@ class SDKResponseAdapter: # it live, extended_thinking turns that end # thinking-only left the UI stuck on "Thought for Xs" # with nothing rendered until a page refresh. + # + # When ``render_reasoning_in_ui=False`` the three + # reasoning helpers below (and the append) no-op, so + # the frontend sees a text-only stream. Persistence + # of the thinking text (``_format_sdk_content_blocks``) + # is unaffected. if block.thinking: self._end_text_if_open(responses) self._ensure_reasoning_started(responses) - responses.append( - StreamReasoningDelta( - id=self.reasoning_block_id, - delta=block.thinking, + if self._render_reasoning_in_ui: + responses.append( + StreamReasoningDelta( + id=self.reasoning_block_id, + delta=block.thinking, + ) ) - ) elif isinstance(block, ToolUseBlock): self._end_text_if_open(responses) @@ -349,7 +369,13 @@ class SDKResponseAdapter: Each ``ThinkingBlock`` the SDK emits gets its own streaming block on the wire so the frontend can render a new ``Reasoning`` part per LLM turn (rather than concatenating across the whole session). + + No-op when ``render_reasoning_in_ui=False`` — callers still drive + the method on every ``ThinkingBlock`` so persistence stays in + lockstep, but nothing reaches the wire. """ + if not self._render_reasoning_in_ui: + return if not self.has_started_reasoning or self.has_ended_reasoning: if self.has_ended_reasoning: self.reasoning_block_id = str(uuid.uuid4()) @@ -358,7 +384,13 @@ class SDKResponseAdapter: self.has_started_reasoning = True def _end_reasoning_if_open(self, responses: list[StreamBaseResponse]) -> None: - """End the current reasoning block if one is open.""" + """End the current reasoning block if one is open. + + No-op when ``render_reasoning_in_ui=False`` — no start was emitted, + so no end is needed. + """ + if not self._render_reasoning_in_ui: + return if self.has_started_reasoning and not self.has_ended_reasoning: responses.append(StreamReasoningEnd(id=self.reasoning_block_id)) self.has_ended_reasoning = True diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py index 634454f9e5..ada472561a 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py @@ -331,6 +331,64 @@ def test_empty_thinking_block_is_ignored(): assert [type(r).__name__ for r in results] == ["StreamStartStep"] +def test_render_reasoning_in_ui_false_suppresses_thinking_events(): + """``render_reasoning_in_ui=False`` silences ``StreamReasoning*`` on + the wire — the frontend sees a text-only stream. Persistence via + ``_format_sdk_content_blocks`` is handled elsewhere; this test only + pins the wire contract. + """ + adapter = SDKResponseAdapter( + message_id="m", + session_id="s", + render_reasoning_in_ui=False, + ) + msg = AssistantMessage( + content=[ThinkingBlock(thinking="plan", signature="sig")], + model="test", + ) + results = adapter.convert_message(msg) + types = [type(r).__name__ for r in results] + assert "StreamReasoningStart" not in types + assert "StreamReasoningDelta" not in types + assert "StreamReasoningEnd" not in types + + +def test_render_reasoning_off_text_after_thinking_emits_no_reasoning_end(): + """With rendering off the ReasoningEnd is never synthesized when text + follows — no ReasoningStart ever hit the wire, so no close is due.""" + adapter = SDKResponseAdapter( + message_id="m", + session_id="s", + render_reasoning_in_ui=False, + ) + adapter.convert_message( + AssistantMessage( + content=[ThinkingBlock(thinking="warming up", signature="sig")], + model="test", + ) + ) + results = adapter.convert_message( + AssistantMessage(content=[TextBlock(text="hello")], model="test") + ) + types = [type(r).__name__ for r in results] + assert "StreamReasoningEnd" not in types + assert "StreamTextStart" in types + assert "StreamTextDelta" in types + + +def test_render_reasoning_on_is_default(): + """Default is True — existing callers keep emitting reasoning events.""" + adapter = SDKResponseAdapter(message_id="m", session_id="s") + msg = AssistantMessage( + content=[ThinkingBlock(thinking="plan", signature="sig")], + model="test", + ) + results = adapter.convert_message(msg) + types = [type(r).__name__ for r in results] + assert "StreamReasoningStart" in types + assert "StreamReasoningDelta" in types + + def test_result_success_synthesizes_fallback_text_when_final_turn_is_thinking_only(): """If the model's last LLM call after a tool_result produced only a ThinkingBlock (no TextBlock), the UI would hang on the tool output diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index 6c7493c045..5d5f10ebb5 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -821,7 +821,11 @@ async def _do_transient_backoff( """ yield StreamStatus(message=f"Connection interrupted, retrying in {backoff}s…") await asyncio.sleep(backoff) - state.adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id) + state.adapter = SDKResponseAdapter( + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) state.usage.reset() @@ -3155,7 +3159,11 @@ async def stream_chat_completion_sdk( options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] # dynamic kwargs - adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id) + adapter = SDKResponseAdapter( + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) # Propagate user_id/session_id as OTEL context attributes so the # langsmith tracing integration attaches them to every span. This @@ -3489,7 +3497,9 @@ async def stream_chat_completion_sdk( session, user_id, is_user_message, state.query_message ) state.adapter = SDKResponseAdapter( - message_id=message_id, session_id=session_id + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, ) # Reset token accumulators so a failed attempt's partial # usage is not double-counted in the successful attempt. diff --git a/autogpt_platform/backend/backend/copilot/stream_registry.py b/autogpt_platform/backend/backend/copilot/stream_registry.py index 424964e075..c2dfb86fdb 100644 --- a/autogpt_platform/backend/backend/copilot/stream_registry.py +++ b/autogpt_platform/backend/backend/copilot/stream_registry.py @@ -485,9 +485,17 @@ async def subscribe_to_session( subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue() stream_key = _get_turn_stream_key(session.turn_id) - # Step 1: Replay messages from Redis Stream + # Step 1: Replay messages from Redis Stream. + # ``count`` caps the replay batch size — lowered from a hard-coded 1000 + # to :attr:`ChatConfig.stream_replay_count` (default 200) to bound the + # replay storm when a tab-switch / browser throttle triggers multiple + # reconnects in quick succession. 200 still covers a full Kimi turn + # after coalescing (~150 events); dropping older entries on replay is + # acceptable because the frontend deduplicates on block ids. xread_start = time.perf_counter() - messages = await redis.xread({stream_key: last_message_id}, block=None, count=1000) + messages = await redis.xread( + {stream_key: last_message_id}, block=None, count=config.stream_replay_count + ) xread_time = (time.perf_counter() - xread_start) * 1000 logger.info( f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={session_status}", diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts index 2412ff5988..4f3f899b68 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts @@ -25,6 +25,18 @@ import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd"; const RECONNECT_BASE_DELAY_MS = 1_000; const RECONNECT_MAX_ATTEMPTS = 3; +/** + * Minimum spacing between successive reconnect attempts. + * `isReconnectScheduledRef` already prevents OVERLAPPING reconnects, but + * tab-throttle / visibility wake bursts can fire `onFinish(isDisconnect)` + * several times inside a single second — each one would schedule a fresh + * reconnect the moment the previous timer cleared the ref. The debounce + * rejects any reconnect request that arrives within this window since the + * last one's resume actually ran, so a throttle storm collapses to one + * GET /stream instead of three. + */ +const RECONNECT_DEBOUNCE_MS = 1_500; + /** Minimum time the page must have been hidden to trigger a wake re-sync. */ const WAKE_RESYNC_THRESHOLD_MS = 30_000; @@ -110,6 +122,11 @@ export function useCopilotStream({ const isReconnectScheduledRef = useRef(false); const [isReconnectScheduled, setIsReconnectScheduled] = useState(false); const reconnectTimerRef = useRef>(); + // Timestamp of the last reconnect's actual resume call — used together + // with RECONNECT_DEBOUNCE_MS to drop rapid duplicate reconnect requests + // (e.g. visibility throttle firing onFinish(isDisconnect) several times + // in the same second). 0 = no reconnect has fired yet in this session. + const lastReconnectResumeAtRef = useRef(0); const hasShownDisconnectToast = useRef(false); // Set when the user explicitly clicks stop — prevents onError from // triggering a reconnect cycle for the resulting AbortError. @@ -127,6 +144,19 @@ export function useCopilotStream({ function handleReconnect(sid: string) { if (isReconnectScheduledRef.current || !sid) return; + // Debounce: if the previous reconnect resumed within the last + // RECONNECT_DEBOUNCE_MS, drop this request. Browser tab-throttle bursts + // can fire onFinish(isDisconnect) 2–3 times in a second; without this + // guard each fires its own GET /stream, each one replays the Redis + // stream, and the flicker storm is back. + const sinceLastResume = Date.now() - lastReconnectResumeAtRef.current; + if ( + lastReconnectResumeAtRef.current > 0 && + sinceLastResume < RECONNECT_DEBOUNCE_MS + ) { + return; + } + const nextAttempt = reconnectAttemptsRef.current + 1; if (nextAttempt > RECONNECT_MAX_ATTEMPTS) { setReconnectExhausted(true); @@ -163,6 +193,7 @@ export function useCopilotStream({ } return prev; }); + lastReconnectResumeAtRef.current = Date.now(); resumeStreamRef.current(); }, delay); } @@ -469,6 +500,7 @@ export function useCopilotStream({ setRateLimitMessage(null); hasShownDisconnectToast.current = false; lastSubmittedMsgRef.current = null; + lastReconnectResumeAtRef.current = 0; setReconnectExhausted(false); setIsSyncing(false); hasResumedRef.current.clear(); From 7ef10b26c0c3c3a1f685438316e7f42a997e4acc Mon Sep 17 00:00:00 2001 From: majdyz Date: Wed, 22 Apr 2026 00:30:18 +0700 Subject: [PATCH 2/5] fix(platform/copilot): address PR review on reasoning flag + reconnect debounce Reconnect debounce (useCopilotStream.ts): - Coalesce-by-delay instead of drop-on-early-return. A fast-failing resume (e.g. 502 on GET /stream at 500ms) would call handleReconnect inside the 1500ms window and the debounce silently returned, stalling the retry loop until a manual refresh. Now we schedule a timer for the remaining window so the retry still fires. Flagged by sentry-bot (HIGH) and coderabbit. render_reasoning_in_ui=False persistence asymmetry: - BaselineReasoningEmitter now also skips ChatMessage(role="reasoning") persistence when render is off. Previously only wire events were silenced while the persisted row was still appended; convertChatSessionToUiMessages unconditionally re-renders reasoning rows as {type: "reasoning"} UI parts on reload, so the flag was a no-op post-refresh. The SDK path was already consistent (_dispatch_response only creates the row on StreamReasoningStart, which the adapter suppresses). Docstrings on the emitter, the adapter, and the config field updated to describe the combined wire+persistence gating and point at the provider transcript as the audit source. - Dropped the "future per-session toggle" promise from the emitter docstring - with hydration unconditionally resurfacing persisted rows, keeping them while silencing the live wire is a footgun, not a feature. - Flagged by coderabbit (major, inline + outside-diff on response_adapter.py). p0_guardrails test env-dependence: - test_replaces_adapter_with_new_instance now asserts against config.render_reasoning_in_ui rather than the True literal, so CHAT_RENDER_REASONING_IN_UI=false in the shell no longer causes the test to fail for the wrong reason. Flagged by coderabbit (outside-diff, minor). --- .../backend/copilot/baseline/reasoning.py | 37 ++++++++++++------- .../copilot/baseline/reasoning_test.py | 18 +++++---- .../backend/backend/copilot/config.py | 21 +++++++---- .../backend/copilot/sdk/p0_guardrails_test.py | 9 +++-- .../backend/copilot/sdk/response_adapter.py | 25 +++++++++---- .../(platform)/copilot/useCopilotStream.ts | 30 +++++++++++---- 6 files changed, 95 insertions(+), 45 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py index 30dceb46de..9128bfd710 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py @@ -174,13 +174,18 @@ class BaselineReasoningEmitter: but leaves the appended row intact. Pass ``render_in_ui=False`` to suppress the ``StreamReasoning*`` wire - events — the emitter still reads the reasoning payload and still - appends / mutates the persisted ``role="reasoning"`` row, but returns - empty event lists from :meth:`on_delta` and :meth:`close`. This lets - the operator (via :attr:`ChatConfig.render_reasoning_in_ui`) hide the - reasoning collapse from the live stream without dropping the audit - trail; a future per-session toggle can surface the persisted rows on - reload. + events AND the ``ChatMessage(role="reasoning")`` persistence row — the + emitter still reads the reasoning payload and still advances its state + machine (so block ids rotate across turns and ``is_open`` stays + consistent), but returns empty event lists from :meth:`on_delta` and + :meth:`close` and never appends to ``session_messages``. Persistence + is suppressed because ``convertChatSessionToUiMessages`` unconditionally + re-renders persisted reasoning rows as ``{type: "reasoning"}`` UI parts + on reload — keeping them while silencing the live wire would resurrect + the reasoning collapse the moment the user refreshed, making the flag + a no-op post-reload. The model still reasons and OpenRouter / the SDK + transcript retain the audit trail; ``session.messages`` only needs + reasoning rows when the UI actually renders them. """ def __init__( @@ -208,10 +213,14 @@ class BaselineReasoningEmitter: lockstep with emission so the row's content stays equal to the concatenated deltas at every delta boundary. - When ``render_in_ui=False`` the wire events are suppressed (empty - list returned) but the state machine still advances and the - persistence row is still appended / mutated, so ``close`` behaves - symmetrically and the persisted audit trail is identical. + When ``render_in_ui=False`` both the wire events AND the persistence + row are suppressed (``session_messages`` is left untouched) — the + state machine still advances (``_open`` / ``_block_id`` rotate) so + ``close`` stays symmetric across turns. Persistence is dropped in + lockstep with the wire events because the frontend's hydration path + (``convertChatSessionToUiMessages``) would otherwise re-render the + persisted rows as reasoning parts on reload, silently undoing the + operator's intent. """ ext = OpenRouterDeltaExtension.from_delta(delta) text = ext.visible_text() @@ -222,7 +231,7 @@ class BaselineReasoningEmitter: if self._render_in_ui: events.append(StreamReasoningStart(id=self._block_id)) self._open = True - if self._session_messages is not None: + if self._render_in_ui and self._session_messages is not None: self._current_row = ChatMessage(role="reasoning", content="") self._session_messages.append(self._current_row) if self._render_in_ui: @@ -243,7 +252,9 @@ class BaselineReasoningEmitter: When ``render_in_ui=False`` the wire event is suppressed (empty list returned) but the block still rotates so subsequent reasoning uses a fresh id — keeps the state machine in lockstep with the - render-on branch. + render-on branch. Nothing is removed from ``session_messages`` + because ``on_delta`` never appended a row in the first place when + render was off. """ if not self._open: return [] diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py index 488b25213c..8c362fce7a 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py @@ -283,9 +283,12 @@ class TestReasoningPersistence: class TestBaselineReasoningEmitterRenderFlag: """``render_in_ui=False`` must silence ``StreamReasoning*`` wire events - while keeping persistence intact — the operator can hide the collapse - without losing the audit trail. These tests pin the contract in both - directions so future refactors can't flip only one half.""" + AND drop persistence of ``role="reasoning"`` rows — the operator hides + the collapse on both the live wire and on reload. Persistence is tied + to the wire events because the frontend's hydration path unconditionally + re-renders persisted reasoning rows; keeping them would make the flag a + no-op post-reload. These tests pin the contract in both directions so + future refactors can't flip only one half.""" def test_render_off_suppresses_start_and_delta(self): emitter = BaselineReasoningEmitter(render_in_ui=False) @@ -302,7 +305,10 @@ class TestBaselineReasoningEmitterRenderFlag: assert events == [] assert emitter.is_open is False - def test_render_off_still_persists_rows(self): + def test_render_off_skips_persistence(self): + """When render is off the emitter must NOT append a ``role="reasoning"`` + row to ``session_messages`` — hydration would re-render it, undoing + the operator's intent.""" session: list[ChatMessage] = [] emitter = BaselineReasoningEmitter(session, render_in_ui=False) @@ -310,9 +316,7 @@ class TestBaselineReasoningEmitterRenderFlag: emitter.on_delta(_delta(reasoning="part two")) emitter.close() - assert len(session) == 1 - assert session[0].role == "reasoning" - assert session[0].content == "part one part two" + assert session == [] def test_render_off_rotates_block_id_between_sessions(self): """Even with wire events silenced the block id must rotate on close, diff --git a/autogpt_platform/backend/backend/copilot/config.py b/autogpt_platform/backend/backend/copilot/config.py index b13f908e0b..3a94472016 100644 --- a/autogpt_platform/backend/backend/copilot/config.py +++ b/autogpt_platform/backend/backend/copilot/config.py @@ -209,13 +209,20 @@ class ChatConfig(BaseSettings): default=True, description="Render extended-thinking reasoning as live UI parts " "(``reasoning-start``/``reasoning-delta``/``reasoning-end``) on the " - "wire. When False, the baseline emitter and the SDK response adapter " - "suppress the three reasoning wire events — the model still reasons " - "and tokens are still billed, but the frontend sees a text-only " - "stream. Reasoning rows are still persisted to ``session.messages`` " - "as ``role='reasoning'`` entries, so a future per-session toggle can " - "surface them on reload. Flip to False to silence the reasoning " - "collapse without dropping the persisted audit trail.", + "wire AND as persisted ``role='reasoning'`` rows in " + "``session.messages``. When False, the baseline emitter and the SDK " + "response adapter suppress the three reasoning wire events AND skip " + "the ``ChatMessage(role='reasoning')`` persistence — the model still " + "reasons and tokens are still billed, but the frontend sees a " + "text-only stream on the live wire and on reload. Persistence is " + "coupled to the wire events because " + "``convertChatSessionToUiMessages.ts`` unconditionally re-renders " + "persisted reasoning rows as ``{type: 'reasoning'}`` UI parts; " + "keeping the rows while silencing the live wire would resurrect the " + "collapse on refresh. Audit trail of the reasoning content remains " + "available through the SDK transcript " + "(``_format_sdk_content_blocks``) / upstream provider logs; " + "``session.messages`` is a UI-render store, not an audit store.", ) stream_replay_count: int = Field( default=200, diff --git a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py index e38fdddd9a..be80f2901b 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py @@ -714,10 +714,13 @@ class TestDoTransientBackoff: mock_sleep.assert_called_once_with(7) async def test_replaces_adapter_with_new_instance(self): - """state.adapter is replaced with a new SDKResponseAdapter after yield.""" + """state.adapter is replaced with a new SDKResponseAdapter after yield, + and ``render_reasoning_in_ui`` is threaded from the SDK service config + (not hardcoded) so ``CHAT_RENDER_REASONING_IN_UI=false`` at runtime + flips the reconstruction consistently with the rest of the path.""" from unittest.mock import AsyncMock, MagicMock, patch - from backend.copilot.sdk.service import _do_transient_backoff + from backend.copilot.sdk.service import _do_transient_backoff, config original_adapter = MagicMock() state = MagicMock() @@ -736,7 +739,7 @@ class TestDoTransientBackoff: mock_cls.assert_called_once_with( message_id="msg-1", session_id="sess-1", - render_reasoning_in_ui=True, + render_reasoning_in_ui=config.render_reasoning_in_ui, ) assert state.adapter is new_adapter diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py index 8c06daa9a5..5c20e159a7 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py @@ -70,10 +70,16 @@ class SDKResponseAdapter: self.has_ended_reasoning = True # When False, ``_ensure_reasoning_started`` / ``StreamReasoningDelta`` # emission / ``_end_reasoning_if_open`` become no-ops so the frontend - # sees a text-only stream for a ``ThinkingBlock``. The block is - # still persisted to ``session.messages`` via - # ``_format_sdk_content_blocks`` — same rule as the baseline - # emitter: operator silences the live collapse, audit trail stays. + # sees a text-only stream for a ``ThinkingBlock``. ``session.messages`` + # persistence is also skipped as a side-effect: ``_dispatch_response`` + # only creates a ``ChatMessage(role="reasoning")`` row on + # ``StreamReasoningStart`` (see ``sdk/service.py``), which is + # suppressed here. That keeps the SDK path in lockstep with the + # baseline emitter — flag off means no live wire, no persisted row, + # and therefore no hydrated reasoning collapse on reload. The + # ThinkingBlock content is still carried through + # ``_format_sdk_content_blocks`` into the SDK transcript for + # ``--resume`` continuity, which is independent of ``session.messages``. self._render_reasoning_in_ui = render_reasoning_in_ui self.current_tool_calls: dict[str, dict[str, str]] = {} self.resolved_tool_calls: set[str] = set() @@ -158,9 +164,14 @@ class SDKResponseAdapter: # # When ``render_reasoning_in_ui=False`` the three # reasoning helpers below (and the append) no-op, so - # the frontend sees a text-only stream. Persistence - # of the thinking text (``_format_sdk_content_blocks``) - # is unaffected. + # the frontend sees a text-only stream AND no + # ``ChatMessage(role='reasoning')`` row is persisted + # (the row is only created by ``_dispatch_response`` + # when ``StreamReasoningStart`` arrives, which is + # suppressed here). Persistence of the thinking text + # into the SDK transcript via + # ``_format_sdk_content_blocks`` is unaffected — that + # feeds ``--resume`` continuity, not the UI. if block.thinking: self._end_text_if_open(responses) self._ensure_reasoning_started(responses) diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts index 4f3f899b68..93b094edca 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts @@ -30,10 +30,11 @@ const RECONNECT_MAX_ATTEMPTS = 3; * `isReconnectScheduledRef` already prevents OVERLAPPING reconnects, but * tab-throttle / visibility wake bursts can fire `onFinish(isDisconnect)` * several times inside a single second — each one would schedule a fresh - * reconnect the moment the previous timer cleared the ref. The debounce - * rejects any reconnect request that arrives within this window since the - * last one's resume actually ran, so a throttle storm collapses to one - * GET /stream instead of three. + * reconnect the moment the previous timer cleared the ref. Requests that + * arrive inside this window since the last reconnect's resume are COALESCED: + * scheduled to run at the window boundary rather than dropped, so a + * fast-failing resume (e.g. a 502 on GET /stream that trips `onError` inside + * 500 ms) still retries instead of stalling the retry loop. */ const RECONNECT_DEBOUNCE_MS = 1_500; @@ -145,15 +146,28 @@ export function useCopilotStream({ if (isReconnectScheduledRef.current || !sid) return; // Debounce: if the previous reconnect resumed within the last - // RECONNECT_DEBOUNCE_MS, drop this request. Browser tab-throttle bursts - // can fire onFinish(isDisconnect) 2–3 times in a second; without this - // guard each fires its own GET /stream, each one replays the Redis - // stream, and the flicker storm is back. + // RECONNECT_DEBOUNCE_MS, COALESCE this request onto the window boundary + // rather than dropping it. Browser tab-throttle bursts can fire + // onFinish(isDisconnect) 2–3 times in a second; without the debounce, + // each fires its own GET /stream, each one replays the Redis stream, + // and the flicker storm is back. Dropping the request silently (the + // previous behaviour) stalled the retry loop when a resume failed + // quickly — e.g. a 502 on GET /stream that trips onError inside 500 ms + // while the 1500 ms window is still open. Scheduling the retry for + // the remaining window preserves both the storm cap and the retry. const sinceLastResume = Date.now() - lastReconnectResumeAtRef.current; if ( lastReconnectResumeAtRef.current > 0 && sinceLastResume < RECONNECT_DEBOUNCE_MS ) { + const remainingDelay = RECONNECT_DEBOUNCE_MS - sinceLastResume; + isReconnectScheduledRef.current = true; + setIsReconnectScheduled(true); + reconnectTimerRef.current = setTimeout(() => { + isReconnectScheduledRef.current = false; + setIsReconnectScheduled(false); + handleReconnect(sid); + }, remainingDelay); return; } From 37de8386529f2f421663aa8ee14fdd88af414af5 Mon Sep 17 00:00:00 2001 From: majdyz Date: Wed, 22 Apr 2026 07:12:18 +0700 Subject: [PATCH 3/5] test(frontend/copilot): cover reconnect debounce window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the inline debounce logic in `useCopilotStream.handleReconnect` into a pure `shouldDebounceReconnect(lastResumeAt, now, windowMs)` helper and cover it with 10 vitest cases (first-reconnect pass-through, inside window coalesce, boundary, beyond window, custom window, burst simulation). The hook wiring shrinks to two lines and the decision surface is 100% covered by unit tests — useful for codecov/patch on the frontend diff. --- .../app/(platform)/copilot/helpers.test.ts | 86 +++++++++++++++++++ .../src/app/(platform)/copilot/helpers.ts | 22 +++++ .../(platform)/copilot/useCopilotStream.ts | 13 +-- 3 files changed, 115 insertions(+), 6 deletions(-) diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts index 91e09efde3..7c61390f1f 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts @@ -7,6 +7,7 @@ import { formatNotificationTitle, getSendSuppressionReason, parseSessionIDs, + shouldDebounceReconnect, shouldSuppressDuplicateSend, } from "./helpers"; @@ -466,3 +467,88 @@ describe("deduplicateMessages", () => { expect(result).toHaveLength(2); // duplicate step-start messages are deduped }); }); + +describe("shouldDebounceReconnect", () => { + const WINDOW_MS = 1_500; + + it("returns null for the first reconnect (lastResumeAt === 0)", () => { + expect(shouldDebounceReconnect(0, 10_000, WINDOW_MS)).toBeNull(); + }); + + it("returns null for a negative lastResumeAt sentinel", () => { + // Defensive: a negative value is still treated as "no reconnect yet". + expect(shouldDebounceReconnect(-1, 10_000, WINDOW_MS)).toBeNull(); + }); + + it("returns the remaining delay when now is inside the window", () => { + // 500ms since the last resume — the caller must wait another 1000ms + // before the storm cap reopens. + const remaining = shouldDebounceReconnect(1_000, 1_500, WINDOW_MS); + expect(remaining).toBe(1_000); + }); + + it("coalesces a reconnect that arrives immediately after the previous resume", () => { + // now === lastResumeAt → sinceLastResume === 0, so the full window remains. + const remaining = shouldDebounceReconnect(5_000, 5_000, WINDOW_MS); + expect(remaining).toBe(WINDOW_MS); + }); + + it("returns null when exactly on the window boundary", () => { + // sinceLastResume === windowMs is NOT inside the window — the next + // reconnect should fire immediately. + expect(shouldDebounceReconnect(1_000, 2_500, WINDOW_MS)).toBeNull(); + }); + + it("returns null when the window has elapsed", () => { + expect(shouldDebounceReconnect(1_000, 5_000, WINDOW_MS)).toBeNull(); + }); + + it("returns a small remaining delay at the far edge of the window", () => { + // 1ms before the window closes → 1ms left. + const remaining = shouldDebounceReconnect(1_000, 2_499, WINDOW_MS); + expect(remaining).toBe(1); + }); + + it("collapses a burst of reconnects into one debounced scheduling", () => { + // Simulates the browser tab-throttle storm: three reconnect calls fire + // within a single second after the last resume. Only the first slot + // would actually run; subsequent calls must always be coalesced. + const lastResumeAt = 10_000; + const firstCallRemaining = shouldDebounceReconnect( + lastResumeAt, + 10_100, + WINDOW_MS, + ); + const secondCallRemaining = shouldDebounceReconnect( + lastResumeAt, + 10_200, + WINDOW_MS, + ); + const thirdCallRemaining = shouldDebounceReconnect( + lastResumeAt, + 10_300, + WINDOW_MS, + ); + expect(firstCallRemaining).toBe(1_400); + expect(secondCallRemaining).toBe(1_300); + expect(thirdCallRemaining).toBe(1_200); + }); + + it("allows a reconnect to fire immediately once the window has passed", () => { + // After the window expires, a retry that came in earlier can now fire + // rather than stalling the loop. Guards against the regression that + // motivated the coalesce-instead-of-drop fix. + const lastResumeAt = 10_000; + expect( + shouldDebounceReconnect(lastResumeAt, 10_500, WINDOW_MS), + ).not.toBeNull(); + expect(shouldDebounceReconnect(lastResumeAt, 11_500, WINDOW_MS)).toBeNull(); + }); + + it("honours a custom windowMs value", () => { + // Shouldn't hard-code 1500 anywhere: the helper is generic over the + // window. + expect(shouldDebounceReconnect(1_000, 1_500, 2_000)).toBe(1_500); + expect(shouldDebounceReconnect(1_000, 3_500, 2_000)).toBeNull(); + }); +}); diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts index b1d87a25d2..131a721117 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts @@ -184,6 +184,28 @@ export function disconnectSessionStream(sessionId: string): void { deleteV2DisconnectSessionStream(sessionId).catch(() => {}); } +/** + * Decide whether a reconnect request must be coalesced onto the debounce + * window boundary, rather than firing immediately. + * + * Returns the remaining milliseconds until the window closes (so the caller + * can schedule a `setTimeout` for that delay) when the previous resume + * happened inside the window, or `null` to let the reconnect proceed now. + * + * `lastResumeAt === 0` signals "no reconnect has fired yet in this session" + * — the first reconnect always passes through regardless of `now`. + */ +export function shouldDebounceReconnect( + lastResumeAt: number, + now: number, + windowMs: number, +): number | null { + if (lastResumeAt <= 0) return null; + const sinceLastResume = now - lastResumeAt; + if (sinceLastResume >= windowMs) return null; + return windowMs - sinceLastResume; +} + /** * Deduplicate messages by ID and by consecutive content fingerprint. * diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts index 93b094edca..afef20c85a 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts @@ -18,6 +18,7 @@ import { resolveInProgressTools, getSendSuppressionReason, disconnectSessionStream, + shouldDebounceReconnect, } from "./helpers"; import type { CopilotLlmModel, CopilotMode } from "./store"; import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd"; @@ -155,12 +156,12 @@ export function useCopilotStream({ // quickly — e.g. a 502 on GET /stream that trips onError inside 500 ms // while the 1500 ms window is still open. Scheduling the retry for // the remaining window preserves both the storm cap and the retry. - const sinceLastResume = Date.now() - lastReconnectResumeAtRef.current; - if ( - lastReconnectResumeAtRef.current > 0 && - sinceLastResume < RECONNECT_DEBOUNCE_MS - ) { - const remainingDelay = RECONNECT_DEBOUNCE_MS - sinceLastResume; + const remainingDelay = shouldDebounceReconnect( + lastReconnectResumeAtRef.current, + Date.now(), + RECONNECT_DEBOUNCE_MS, + ); + if (remainingDelay !== null) { isReconnectScheduledRef.current = true; setIsReconnectScheduled(true); reconnectTimerRef.current = setTimeout(() => { From e516c9ce3abceac88fb9e5e7a7779a916ad28b32 Mon Sep 17 00:00:00 2001 From: majdyz Date: Wed, 22 Apr 2026 07:17:05 +0700 Subject: [PATCH 4/5] test(frontend/copilot): hook test for reconnect debounce burst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mocks @ai-sdk/react so renderHook(useCopilotStream) can capture the onFinish callback directly and drive handleReconnect without real SSE. Two cases, both on vi.useFakeTimers(): - a burst of onFinish({isDisconnect: true}) inside the 1500ms window coalesces onto the boundary — resumeStream is called once for the first cycle, then a second time only after the window + attempt-#2 backoff elapse. - a disconnect arriving after the window closes takes the normal backoff path (not the debounce branch). Covers the wiring lines shouldDebounceReconnect can't reach on its own (useRef(0), the remainingDelay !== null branch's timer setup, and the Date.now() stamp on resume). Together with the helper unit tests this brings the codecov/patch diff for platform-frontend from 0% to full coverage on the debounce lines. --- .../__tests__/useCopilotStream.test.ts | 177 ++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts new file mode 100644 index 0000000000..e56317bf04 --- /dev/null +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts @@ -0,0 +1,177 @@ +import { act, renderHook } from "@testing-library/react"; +import type { UIMessage } from "ai"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { useCopilotStream } from "../useCopilotStream"; + +// Capture the args passed to ``useChat`` so tests can invoke onFinish/onError +// directly — that's the only way to drive handleReconnect without a real SSE. +let lastUseChatArgs: { + onFinish?: (args: { isDisconnect?: boolean; isAbort?: boolean }) => void; + onError?: (err: Error) => void; +} | null = null; + +const resumeStreamMock = vi.fn(); +const sdkStopMock = vi.fn(); +const sdkSendMessageMock = vi.fn(); +const setMessagesMock = vi.fn(); + +function resetSdkMocks() { + lastUseChatArgs = null; + resumeStreamMock.mockReset(); + sdkStopMock.mockReset(); + sdkSendMessageMock.mockReset(); + setMessagesMock.mockReset(); +} + +vi.mock("@ai-sdk/react", () => ({ + useChat: (args: unknown) => { + lastUseChatArgs = args as typeof lastUseChatArgs; + return { + messages: [] as UIMessage[], + sendMessage: sdkSendMessageMock, + stop: sdkStopMock, + status: "ready" as const, + error: undefined, + setMessages: setMessagesMock, + resumeStream: resumeStreamMock, + }; + }, +})); + +vi.mock("ai", async () => { + const actual = await vi.importActual("ai"); + return { + ...actual, + DefaultChatTransport: class { + constructor(public opts: unknown) {} + }, + }; +}); + +vi.mock("@tanstack/react-query", () => ({ + useQueryClient: () => ({ invalidateQueries: vi.fn() }), +})); + +vi.mock("@/app/api/__generated__/endpoints/chat/chat", () => ({ + getGetV2GetCopilotUsageQueryKey: () => ["copilot-usage"], + getGetV2GetSessionQueryKey: (id: string) => ["session", id], + postV2CancelSessionTask: vi.fn(), + deleteV2DisconnectSessionStream: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("@/components/molecules/Toast/use-toast", () => ({ + toast: vi.fn(), +})); + +vi.mock("@/services/environment", () => ({ + environment: { + getAGPTServerBaseUrl: () => "http://localhost", + }, +})); + +vi.mock("../helpers", async () => { + const actual = + await vi.importActual("../helpers"); + return { + ...actual, + getCopilotAuthHeaders: vi.fn().mockResolvedValue({}), + disconnectSessionStream: vi.fn(), + }; +}); + +vi.mock("../useHydrateOnStreamEnd", () => ({ + useHydrateOnStreamEnd: () => undefined, +})); + +function renderStream() { + return renderHook(() => + useCopilotStream({ + sessionId: "sess-1", + hydratedMessages: [], + hasActiveStream: false, + refetchSession: vi.fn().mockResolvedValue({ data: undefined }), + copilotMode: undefined, + copilotModel: undefined, + }), + ); +} + +describe("useCopilotStream — reconnect debounce", () => { + beforeEach(() => { + resetSdkMocks(); + vi.useFakeTimers(); + // Pin Date.now so sinceLastResume math is deterministic. The hook reads + // Date.now() both when stashing lastReconnectResumeAtRef and when + // deciding whether to debounce. + vi.setSystemTime(new Date(2025, 0, 1, 12, 0, 0)); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("coalesces a burst of disconnect events into one resumeStream call", async () => { + renderStream(); + + // First disconnect — schedules a reconnect at the exponential backoff + // delay (1000ms for attempt #1). + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + + // Fire the scheduled timer → resumeStream runs once and stamps + // lastReconnectResumeAtRef.current = Date.now(). + await act(async () => { + await vi.advanceTimersByTimeAsync(1_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(1); + + // A second disconnect arrives immediately after (still inside the + // 1500ms debounce window) — the debounce path must fire and queue a + // coalesced timer, NOT a fresh resume. + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(1); + + // The coalesced timer fires at the window boundary and reschedules a + // real reconnect. Advance past the window AND past the second + // reconnect's backoff (attempt #2 = 2000ms) so resume runs. + await act(async () => { + await vi.advanceTimersByTimeAsync(1_500); + }); + await act(async () => { + await vi.advanceTimersByTimeAsync(2_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(2); + }); + + it("does not debounce a reconnect that arrives after the window closes", async () => { + renderStream(); + + // First reconnect cycle. + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + await act(async () => { + await vi.advanceTimersByTimeAsync(1_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(1); + + // Wait past the debounce window before the next disconnect. + await act(async () => { + await vi.advanceTimersByTimeAsync(2_000); + }); + + // Now a fresh disconnect should go through the normal path (NOT the + // debounce branch) and schedule a backoff of 2000ms (attempt #2). + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + await act(async () => { + await vi.advanceTimersByTimeAsync(2_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(2); + }); +}); From 2e7b6746251714f47ef66364bc576e5f4805af86 Mon Sep 17 00:00:00 2001 From: majdyz Date: Wed, 22 Apr 2026 07:28:44 +0700 Subject: [PATCH 5/5] refactor(backend/copilot): trim verbose comments on reasoning + replay knobs --- .../backend/copilot/baseline/reasoning.py | 31 ++----------------- .../backend/backend/copilot/config.py | 30 +++--------------- .../backend/copilot/sdk/response_adapter.py | 14 ++------- .../backend/copilot/stream_registry.py | 8 +---- 4 files changed, 9 insertions(+), 74 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py index 9128bfd710..8d0100f25b 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py @@ -173,19 +173,8 @@ class BaselineReasoningEmitter: in-place as further deltas arrive; :meth:`close` drops the reference but leaves the appended row intact. - Pass ``render_in_ui=False`` to suppress the ``StreamReasoning*`` wire - events AND the ``ChatMessage(role="reasoning")`` persistence row — the - emitter still reads the reasoning payload and still advances its state - machine (so block ids rotate across turns and ``is_open`` stays - consistent), but returns empty event lists from :meth:`on_delta` and - :meth:`close` and never appends to ``session_messages``. Persistence - is suppressed because ``convertChatSessionToUiMessages`` unconditionally - re-renders persisted reasoning rows as ``{type: "reasoning"}`` UI parts - on reload — keeping them while silencing the live wire would resurrect - the reasoning collapse the moment the user refreshed, making the flag - a no-op post-reload. The model still reasons and OpenRouter / the SDK - transcript retain the audit trail; ``session.messages`` only needs - reasoning rows when the UI actually renders them. + ``render_in_ui=False`` suppresses wire events + persistence row; + state machine still advances. """ def __init__( @@ -212,15 +201,6 @@ class BaselineReasoningEmitter: Persistence (when a session message list is attached) happens in lockstep with emission so the row's content stays equal to the concatenated deltas at every delta boundary. - - When ``render_in_ui=False`` both the wire events AND the persistence - row are suppressed (``session_messages`` is left untouched) — the - state machine still advances (``_open`` / ``_block_id`` rotate) so - ``close`` stays symmetric across turns. Persistence is dropped in - lockstep with the wire events because the frontend's hydration path - (``convertChatSessionToUiMessages``) would otherwise re-render the - persisted rows as reasoning parts on reload, silently undoing the - operator's intent. """ ext = OpenRouterDeltaExtension.from_delta(delta) text = ext.visible_text() @@ -248,13 +228,6 @@ class BaselineReasoningEmitter: than reusing one already closed on the wire. The persisted row is not removed — it stays in ``session_messages`` as the durable record of what was reasoned. - - When ``render_in_ui=False`` the wire event is suppressed (empty - list returned) but the block still rotates so subsequent reasoning - uses a fresh id — keeps the state machine in lockstep with the - render-on branch. Nothing is removed from ``session_messages`` - because ``on_delta`` never appended a row in the first place when - render was off. """ if not self._open: return [] diff --git a/autogpt_platform/backend/backend/copilot/config.py b/autogpt_platform/backend/backend/copilot/config.py index 3a94472016..124afb5be8 100644 --- a/autogpt_platform/backend/backend/copilot/config.py +++ b/autogpt_platform/backend/backend/copilot/config.py @@ -207,37 +207,15 @@ class ChatConfig(BaseSettings): ) render_reasoning_in_ui: bool = Field( default=True, - description="Render extended-thinking reasoning as live UI parts " - "(``reasoning-start``/``reasoning-delta``/``reasoning-end``) on the " - "wire AND as persisted ``role='reasoning'`` rows in " - "``session.messages``. When False, the baseline emitter and the SDK " - "response adapter suppress the three reasoning wire events AND skip " - "the ``ChatMessage(role='reasoning')`` persistence — the model still " - "reasons and tokens are still billed, but the frontend sees a " - "text-only stream on the live wire and on reload. Persistence is " - "coupled to the wire events because " - "``convertChatSessionToUiMessages.ts`` unconditionally re-renders " - "persisted reasoning rows as ``{type: 'reasoning'}`` UI parts; " - "keeping the rows while silencing the live wire would resurrect the " - "collapse on refresh. Audit trail of the reasoning content remains " - "available through the SDK transcript " - "(``_format_sdk_content_blocks``) / upstream provider logs; " - "``session.messages`` is a UI-render store, not an audit store.", + description="Render reasoning as live UI parts + persist " + "``role='reasoning'`` rows. False suppresses both; tokens are still " + "billed upstream.", ) stream_replay_count: int = Field( default=200, ge=1, le=10000, - description="Maximum number of Redis stream entries replayed on SSE " - "reconnect in :func:`stream_registry.subscribe_to_session`. Lowered " - "from 1000 to bound the replay storm when a tab-switch / throttle " - "triggers multiple quick reconnects — 200 still covers a full Kimi " - "turn after coalescing (~150 events) and leaves headroom for long " - "Opus tool loops. Redis ``XREAD`` ``count`` caps the *response size*, " - "not the total events that existed; dropping older entries on replay " - "is acceptable because the frontend deduplicates on block ids. " - "Scoped to the SSE resume path only — live stream listener XREAD " - "(no replay) is unchanged.", + description="Max Redis stream entries replayed on SSE reconnect.", ) claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = ( Field( diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py index 5c20e159a7..0da78c5f9e 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py @@ -68,18 +68,8 @@ class SDKResponseAdapter: self.reasoning_block_id = str(uuid.uuid4()) self.has_started_reasoning = False self.has_ended_reasoning = True - # When False, ``_ensure_reasoning_started`` / ``StreamReasoningDelta`` - # emission / ``_end_reasoning_if_open`` become no-ops so the frontend - # sees a text-only stream for a ``ThinkingBlock``. ``session.messages`` - # persistence is also skipped as a side-effect: ``_dispatch_response`` - # only creates a ``ChatMessage(role="reasoning")`` row on - # ``StreamReasoningStart`` (see ``sdk/service.py``), which is - # suppressed here. That keeps the SDK path in lockstep with the - # baseline emitter — flag off means no live wire, no persisted row, - # and therefore no hydrated reasoning collapse on reload. The - # ThinkingBlock content is still carried through - # ``_format_sdk_content_blocks`` into the SDK transcript for - # ``--resume`` continuity, which is independent of ``session.messages``. + # When False, reasoning wire events + persisted reasoning rows are + # suppressed; transcript continuity is unaffected. self._render_reasoning_in_ui = render_reasoning_in_ui self.current_tool_calls: dict[str, dict[str, str]] = {} self.resolved_tool_calls: set[str] = set() diff --git a/autogpt_platform/backend/backend/copilot/stream_registry.py b/autogpt_platform/backend/backend/copilot/stream_registry.py index c2dfb86fdb..e4559c46e5 100644 --- a/autogpt_platform/backend/backend/copilot/stream_registry.py +++ b/autogpt_platform/backend/backend/copilot/stream_registry.py @@ -485,13 +485,7 @@ async def subscribe_to_session( subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue() stream_key = _get_turn_stream_key(session.turn_id) - # Step 1: Replay messages from Redis Stream. - # ``count`` caps the replay batch size — lowered from a hard-coded 1000 - # to :attr:`ChatConfig.stream_replay_count` (default 200) to bound the - # replay storm when a tab-switch / browser throttle triggers multiple - # reconnects in quick succession. 200 still covers a full Kimi turn - # after coalescing (~150 events); dropping older entries on replay is - # acceptable because the frontend deduplicates on block ids. + # Replay batch capped by ``stream_replay_count``. xread_start = time.perf_counter() messages = await redis.xread( {stream_key: last_message_id}, block=None, count=config.stream_replay_count