diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py index 0c689ed4a7..9dc246a058 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning.py @@ -242,6 +242,9 @@ class BaselineReasoningEmitter: fresh ``ChatMessage(role="reasoning")`` is appended and mutated in-place as further deltas arrive; :meth:`close` drops the reference but leaves the appended row intact. + + ``render_in_ui=False`` suppresses wire events + persistence row; + state machine still advances. """ def __init__( @@ -250,21 +253,20 @@ class BaselineReasoningEmitter: *, coalesce_min_chars: int = _COALESCE_MIN_CHARS, coalesce_max_interval_ms: float = _COALESCE_MAX_INTERVAL_MS, + render_in_ui: bool = True, ) -> None: self._block_id: str = str(uuid.uuid4()) self._open: bool = False self._session_messages = session_messages self._current_row: ChatMessage | None = None # Coalescing state — ``_pending_delta`` accumulates reasoning text - # between wire flushes. Providers like Kimi K2.6 emit very fine- - # grained chunks; batching them reduces Redis ``xadd`` + SSE + React - # re-render load by ~100x for equivalent text output. Tuning knobs - # are kwargs so tests can disable coalescing (``=0``) for - # deterministic event assertions. + # between wire flushes. Tuning knobs are kwargs so tests can + # disable coalescing (``=0``) for deterministic event assertions. self._coalesce_min_chars = coalesce_min_chars self._coalesce_max_interval_ms = coalesce_max_interval_ms self._pending_delta: str = "" self._last_flush_monotonic: float = 0.0 + self._render_in_ui = render_in_ui @property def is_open(self) -> bool: @@ -296,26 +298,25 @@ class BaselineReasoningEmitter: # syscalls off the hot path without changing semantics. now = time.monotonic() if not self._open: - events.append(StreamReasoningStart(id=self._block_id)) - events.append(StreamReasoningDelta(id=self._block_id, delta=text)) + if self._render_in_ui: + events.append(StreamReasoningStart(id=self._block_id)) + events.append(StreamReasoningDelta(id=self._block_id, delta=text)) self._open = True self._last_flush_monotonic = now - if self._session_messages is not None: + if self._render_in_ui and self._session_messages is not None: self._current_row = ChatMessage(role="reasoning", content=text) self._session_messages.append(self._current_row) return events - # Persist per-delta (no coalescing here — the session snapshot stays - # consistent at every chunk boundary, independent of the wire - # coalesce window). if self._current_row is not None: self._current_row.content = (self._current_row.content or "") + text self._pending_delta += text if self._should_flush_pending(now): - events.append( - StreamReasoningDelta(id=self._block_id, delta=self._pending_delta) - ) + if self._render_in_ui: + events.append( + StreamReasoningDelta(id=self._block_id, delta=self._pending_delta) + ) self._pending_delta = "" self._last_flush_monotonic = now return events @@ -348,12 +349,13 @@ class BaselineReasoningEmitter: if not self._open: return [] events: list[StreamBaseResponse] = [] - if self._pending_delta: - events.append( - StreamReasoningDelta(id=self._block_id, delta=self._pending_delta) - ) - self._pending_delta = "" - events.append(StreamReasoningEnd(id=self._block_id)) + if self._render_in_ui: + if self._pending_delta: + events.append( + StreamReasoningDelta(id=self._block_id, delta=self._pending_delta) + ) + events.append(StreamReasoningEnd(id=self._block_id)) + self._pending_delta = "" self._open = False self._block_id = str(uuid.uuid4()) self._current_row = None diff --git a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py index e18c8066e4..8283b3e0ad 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py +++ b/autogpt_platform/backend/backend/copilot/baseline/reasoning_test.py @@ -452,3 +452,60 @@ class TestReasoningPersistence: events = emitter.on_delta(_delta(reasoning="pure wire")) assert len(events) == 2 # start + delta, no crash # Nothing else to assert — just proves None session is supported. + + +class TestBaselineReasoningEmitterRenderFlag: + """``render_in_ui=False`` must silence ``StreamReasoning*`` wire events + AND drop persistence of ``role="reasoning"`` rows — the operator hides + the collapse on both the live wire and on reload. Persistence is tied + to the wire events because the frontend's hydration path unconditionally + re-renders persisted reasoning rows; keeping them would make the flag a + no-op post-reload. These tests pin the contract in both directions so + future refactors can't flip only one half.""" + + def test_render_off_suppresses_start_and_delta(self): + emitter = BaselineReasoningEmitter(render_in_ui=False) + events = emitter.on_delta(_delta(reasoning="hidden")) + # No wire events, but state advanced (is_open == True) so close() + # below has something to rotate. + assert events == [] + assert emitter.is_open is True + + def test_render_off_suppresses_close_end(self): + emitter = BaselineReasoningEmitter(render_in_ui=False) + emitter.on_delta(_delta(reasoning="hidden")) + events = emitter.close() + assert events == [] + assert emitter.is_open is False + + def test_render_off_skips_persistence(self): + """When render is off the emitter must NOT append a ``role="reasoning"`` + row to ``session_messages`` — hydration would re-render it, undoing + the operator's intent.""" + session: list[ChatMessage] = [] + emitter = BaselineReasoningEmitter(session, render_in_ui=False) + + emitter.on_delta(_delta(reasoning="part one ")) + emitter.on_delta(_delta(reasoning="part two")) + emitter.close() + + assert session == [] + + def test_render_off_rotates_block_id_between_sessions(self): + """Even with wire events silenced the block id must rotate on close, + otherwise a hypothetical mid-session flip would reuse a stale id.""" + emitter = BaselineReasoningEmitter(render_in_ui=False) + emitter.on_delta(_delta(reasoning="first")) + first_block_id = emitter._block_id + emitter.close() + emitter.on_delta(_delta(reasoning="second")) + assert emitter._block_id != first_block_id + + def test_render_on_is_default(self): + """Defaulting to True preserves backward compat — existing callers + that don't pass the kwarg keep emitting wire events as before.""" + emitter = BaselineReasoningEmitter() + events = emitter.on_delta(_delta(reasoning="hello")) + assert len(events) == 2 + assert isinstance(events[0], StreamReasoningStart) + assert isinstance(events[1], StreamReasoningDelta) diff --git a/autogpt_platform/backend/backend/copilot/baseline/service.py b/autogpt_platform/backend/backend/copilot/baseline/service.py index 6aa88e9d41..f95157ea2b 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/service.py +++ b/autogpt_platform/backend/backend/copilot/baseline/service.py @@ -382,7 +382,13 @@ class _BaselineStreamState: # frontend's ``convertChatSessionToUiMessages`` relies on these # rows to render the Reasoning collapse after the AI SDK's # stream-end hydrate swaps in the DB-backed message list. - self.reasoning_emitter = BaselineReasoningEmitter(self.session_messages) + # ``render_in_ui`` is sourced from ``config.render_reasoning_in_ui`` + # so the operator can silence the reasoning collapse globally + # without dropping the persisted audit trail. + self.reasoning_emitter = BaselineReasoningEmitter( + self.session_messages, + render_in_ui=config.render_reasoning_in_ui, + ) def _is_anthropic_model(model: str) -> bool: diff --git a/autogpt_platform/backend/backend/copilot/config.py b/autogpt_platform/backend/backend/copilot/config.py index 1183217f37..5226007e39 100644 --- a/autogpt_platform/backend/backend/copilot/config.py +++ b/autogpt_platform/backend/backend/copilot/config.py @@ -249,6 +249,18 @@ class ChatConfig(BaseSettings): "``max_thinking_tokens`` kwarg so the CLI falls back to model default " "(which, without the flag, leaves extended thinking off).", ) + render_reasoning_in_ui: bool = Field( + default=True, + description="Render reasoning as live UI parts + persist " + "``role='reasoning'`` rows. False suppresses both; tokens are still " + "billed upstream.", + ) + stream_replay_count: int = Field( + default=200, + ge=1, + le=10000, + description="Max Redis stream entries replayed on SSE reconnect.", + ) claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = ( Field( default=None, diff --git a/autogpt_platform/backend/backend/copilot/config_test.py b/autogpt_platform/backend/backend/copilot/config_test.py index fe8e67b7ff..25f9f477f2 100644 --- a/autogpt_platform/backend/backend/copilot/config_test.py +++ b/autogpt_platform/backend/backend/copilot/config_test.py @@ -19,6 +19,8 @@ _ENV_VARS_TO_CLEAR = ( "OPENAI_BASE_URL", "CHAT_CLAUDE_AGENT_CLI_PATH", "CLAUDE_AGENT_CLI_PATH", + "CHAT_RENDER_REASONING_IN_UI", + "CHAT_STREAM_REPLAY_COUNT", ) @@ -164,3 +166,38 @@ class TestClaudeAgentCliPathEnvFallback: monkeypatch.setenv("CLAUDE_AGENT_CLI_PATH", str(tmp_path)) with pytest.raises(Exception, match="not a regular file"): ChatConfig() + + +class TestRenderReasoningInUi: + """``render_reasoning_in_ui`` gates reasoning wire events globally.""" + + def test_defaults_to_true(self): + """Default must stay True — flipping it silences the reasoning + collapse for every user, which is an opt-in operator decision.""" + cfg = ChatConfig() + assert cfg.render_reasoning_in_ui is True + + def test_env_override_false(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CHAT_RENDER_REASONING_IN_UI", "false") + cfg = ChatConfig() + assert cfg.render_reasoning_in_ui is False + + +class TestStreamReplayCount: + """``stream_replay_count`` caps the SSE reconnect replay batch size.""" + + def test_default_is_200(self): + """200 covers a full Kimi turn after coalescing (~150 events) while + bounding the replay storm from 1000+ chunks.""" + cfg = ChatConfig() + assert cfg.stream_replay_count == 200 + + def test_env_override(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CHAT_STREAM_REPLAY_COUNT", "500") + cfg = ChatConfig() + assert cfg.stream_replay_count == 500 + + def test_zero_rejected(self): + """count=0 would make XREAD replay nothing — rejected via ge=1.""" + with pytest.raises(Exception): + ChatConfig(stream_replay_count=0) diff --git a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py index 17b54797b8..be80f2901b 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/p0_guardrails_test.py @@ -714,10 +714,13 @@ class TestDoTransientBackoff: mock_sleep.assert_called_once_with(7) async def test_replaces_adapter_with_new_instance(self): - """state.adapter is replaced with a new SDKResponseAdapter after yield.""" + """state.adapter is replaced with a new SDKResponseAdapter after yield, + and ``render_reasoning_in_ui`` is threaded from the SDK service config + (not hardcoded) so ``CHAT_RENDER_REASONING_IN_UI=false`` at runtime + flips the reconstruction consistently with the rest of the path.""" from unittest.mock import AsyncMock, MagicMock, patch - from backend.copilot.sdk.service import _do_transient_backoff + from backend.copilot.sdk.service import _do_transient_backoff, config original_adapter = MagicMock() state = MagicMock() @@ -733,7 +736,11 @@ class TestDoTransientBackoff: async for _ in _do_transient_backoff(3, state, "msg-1", "sess-1"): pass - mock_cls.assert_called_once_with(message_id="msg-1", session_id="sess-1") + mock_cls.assert_called_once_with( + message_id="msg-1", + session_id="sess-1", + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) assert state.adapter is new_adapter async def test_resets_usage_after_yield(self): diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py index fbd73d9277..0da78c5f9e 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter.py @@ -53,7 +53,13 @@ class SDKResponseAdapter: text blocks, tool calls, and message lifecycle. """ - def __init__(self, message_id: str | None = None, session_id: str | None = None): + def __init__( + self, + message_id: str | None = None, + session_id: str | None = None, + *, + render_reasoning_in_ui: bool = True, + ): self.message_id = message_id or str(uuid.uuid4()) self.session_id = session_id self.text_block_id = str(uuid.uuid4()) @@ -62,6 +68,9 @@ class SDKResponseAdapter: self.reasoning_block_id = str(uuid.uuid4()) self.has_started_reasoning = False self.has_ended_reasoning = True + # When False, reasoning wire events + persisted reasoning rows are + # suppressed; transcript continuity is unaffected. + self._render_reasoning_in_ui = render_reasoning_in_ui self.current_tool_calls: dict[str, dict[str, str]] = {} self.resolved_tool_calls: set[str] = set() self.step_open = False @@ -142,15 +151,27 @@ class SDKResponseAdapter: # it live, extended_thinking turns that end # thinking-only left the UI stuck on "Thought for Xs" # with nothing rendered until a page refresh. + # + # When ``render_reasoning_in_ui=False`` the three + # reasoning helpers below (and the append) no-op, so + # the frontend sees a text-only stream AND no + # ``ChatMessage(role='reasoning')`` row is persisted + # (the row is only created by ``_dispatch_response`` + # when ``StreamReasoningStart`` arrives, which is + # suppressed here). Persistence of the thinking text + # into the SDK transcript via + # ``_format_sdk_content_blocks`` is unaffected — that + # feeds ``--resume`` continuity, not the UI. if block.thinking: self._end_text_if_open(responses) self._ensure_reasoning_started(responses) - responses.append( - StreamReasoningDelta( - id=self.reasoning_block_id, - delta=block.thinking, + if self._render_reasoning_in_ui: + responses.append( + StreamReasoningDelta( + id=self.reasoning_block_id, + delta=block.thinking, + ) ) - ) elif isinstance(block, ToolUseBlock): self._end_text_if_open(responses) @@ -349,7 +370,13 @@ class SDKResponseAdapter: Each ``ThinkingBlock`` the SDK emits gets its own streaming block on the wire so the frontend can render a new ``Reasoning`` part per LLM turn (rather than concatenating across the whole session). + + No-op when ``render_reasoning_in_ui=False`` — callers still drive + the method on every ``ThinkingBlock`` so persistence stays in + lockstep, but nothing reaches the wire. """ + if not self._render_reasoning_in_ui: + return if not self.has_started_reasoning or self.has_ended_reasoning: if self.has_ended_reasoning: self.reasoning_block_id = str(uuid.uuid4()) @@ -358,7 +385,13 @@ class SDKResponseAdapter: self.has_started_reasoning = True def _end_reasoning_if_open(self, responses: list[StreamBaseResponse]) -> None: - """End the current reasoning block if one is open.""" + """End the current reasoning block if one is open. + + No-op when ``render_reasoning_in_ui=False`` — no start was emitted, + so no end is needed. + """ + if not self._render_reasoning_in_ui: + return if self.has_started_reasoning and not self.has_ended_reasoning: responses.append(StreamReasoningEnd(id=self.reasoning_block_id)) self.has_ended_reasoning = True diff --git a/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py b/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py index 634454f9e5..ada472561a 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/response_adapter_test.py @@ -331,6 +331,64 @@ def test_empty_thinking_block_is_ignored(): assert [type(r).__name__ for r in results] == ["StreamStartStep"] +def test_render_reasoning_in_ui_false_suppresses_thinking_events(): + """``render_reasoning_in_ui=False`` silences ``StreamReasoning*`` on + the wire — the frontend sees a text-only stream. Persistence via + ``_format_sdk_content_blocks`` is handled elsewhere; this test only + pins the wire contract. + """ + adapter = SDKResponseAdapter( + message_id="m", + session_id="s", + render_reasoning_in_ui=False, + ) + msg = AssistantMessage( + content=[ThinkingBlock(thinking="plan", signature="sig")], + model="test", + ) + results = adapter.convert_message(msg) + types = [type(r).__name__ for r in results] + assert "StreamReasoningStart" not in types + assert "StreamReasoningDelta" not in types + assert "StreamReasoningEnd" not in types + + +def test_render_reasoning_off_text_after_thinking_emits_no_reasoning_end(): + """With rendering off the ReasoningEnd is never synthesized when text + follows — no ReasoningStart ever hit the wire, so no close is due.""" + adapter = SDKResponseAdapter( + message_id="m", + session_id="s", + render_reasoning_in_ui=False, + ) + adapter.convert_message( + AssistantMessage( + content=[ThinkingBlock(thinking="warming up", signature="sig")], + model="test", + ) + ) + results = adapter.convert_message( + AssistantMessage(content=[TextBlock(text="hello")], model="test") + ) + types = [type(r).__name__ for r in results] + assert "StreamReasoningEnd" not in types + assert "StreamTextStart" in types + assert "StreamTextDelta" in types + + +def test_render_reasoning_on_is_default(): + """Default is True — existing callers keep emitting reasoning events.""" + adapter = SDKResponseAdapter(message_id="m", session_id="s") + msg = AssistantMessage( + content=[ThinkingBlock(thinking="plan", signature="sig")], + model="test", + ) + results = adapter.convert_message(msg) + types = [type(r).__name__ for r in results] + assert "StreamReasoningStart" in types + assert "StreamReasoningDelta" in types + + def test_result_success_synthesizes_fallback_text_when_final_turn_is_thinking_only(): """If the model's last LLM call after a tool_result produced only a ThinkingBlock (no TextBlock), the UI would hang on the tool output diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index 69dda8f227..647e10e8d0 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -823,7 +823,11 @@ async def _do_transient_backoff( """ yield StreamStatus(message=f"Connection interrupted, retrying in {backoff}s…") await asyncio.sleep(backoff) - state.adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id) + state.adapter = SDKResponseAdapter( + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) state.usage.reset() @@ -3160,7 +3164,11 @@ async def stream_chat_completion_sdk( options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] # dynamic kwargs - adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id) + adapter = SDKResponseAdapter( + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, + ) # Propagate user_id/session_id as OTEL context attributes so the # langsmith tracing integration attaches them to every span. This @@ -3494,7 +3502,9 @@ async def stream_chat_completion_sdk( session, user_id, is_user_message, state.query_message ) state.adapter = SDKResponseAdapter( - message_id=message_id, session_id=session_id + message_id=message_id, + session_id=session_id, + render_reasoning_in_ui=config.render_reasoning_in_ui, ) # Reset token accumulators so a failed attempt's partial # usage is not double-counted in the successful attempt. diff --git a/autogpt_platform/backend/backend/copilot/stream_registry.py b/autogpt_platform/backend/backend/copilot/stream_registry.py index 424964e075..e4559c46e5 100644 --- a/autogpt_platform/backend/backend/copilot/stream_registry.py +++ b/autogpt_platform/backend/backend/copilot/stream_registry.py @@ -485,9 +485,11 @@ async def subscribe_to_session( subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue() stream_key = _get_turn_stream_key(session.turn_id) - # Step 1: Replay messages from Redis Stream + # Replay batch capped by ``stream_replay_count``. xread_start = time.perf_counter() - messages = await redis.xread({stream_key: last_message_id}, block=None, count=1000) + messages = await redis.xread( + {stream_key: last_message_id}, block=None, count=config.stream_replay_count + ) xread_time = (time.perf_counter() - xread_start) * 1000 logger.info( f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={session_status}", diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts new file mode 100644 index 0000000000..e56317bf04 --- /dev/null +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/__tests__/useCopilotStream.test.ts @@ -0,0 +1,177 @@ +import { act, renderHook } from "@testing-library/react"; +import type { UIMessage } from "ai"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { useCopilotStream } from "../useCopilotStream"; + +// Capture the args passed to ``useChat`` so tests can invoke onFinish/onError +// directly — that's the only way to drive handleReconnect without a real SSE. +let lastUseChatArgs: { + onFinish?: (args: { isDisconnect?: boolean; isAbort?: boolean }) => void; + onError?: (err: Error) => void; +} | null = null; + +const resumeStreamMock = vi.fn(); +const sdkStopMock = vi.fn(); +const sdkSendMessageMock = vi.fn(); +const setMessagesMock = vi.fn(); + +function resetSdkMocks() { + lastUseChatArgs = null; + resumeStreamMock.mockReset(); + sdkStopMock.mockReset(); + sdkSendMessageMock.mockReset(); + setMessagesMock.mockReset(); +} + +vi.mock("@ai-sdk/react", () => ({ + useChat: (args: unknown) => { + lastUseChatArgs = args as typeof lastUseChatArgs; + return { + messages: [] as UIMessage[], + sendMessage: sdkSendMessageMock, + stop: sdkStopMock, + status: "ready" as const, + error: undefined, + setMessages: setMessagesMock, + resumeStream: resumeStreamMock, + }; + }, +})); + +vi.mock("ai", async () => { + const actual = await vi.importActual("ai"); + return { + ...actual, + DefaultChatTransport: class { + constructor(public opts: unknown) {} + }, + }; +}); + +vi.mock("@tanstack/react-query", () => ({ + useQueryClient: () => ({ invalidateQueries: vi.fn() }), +})); + +vi.mock("@/app/api/__generated__/endpoints/chat/chat", () => ({ + getGetV2GetCopilotUsageQueryKey: () => ["copilot-usage"], + getGetV2GetSessionQueryKey: (id: string) => ["session", id], + postV2CancelSessionTask: vi.fn(), + deleteV2DisconnectSessionStream: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("@/components/molecules/Toast/use-toast", () => ({ + toast: vi.fn(), +})); + +vi.mock("@/services/environment", () => ({ + environment: { + getAGPTServerBaseUrl: () => "http://localhost", + }, +})); + +vi.mock("../helpers", async () => { + const actual = + await vi.importActual("../helpers"); + return { + ...actual, + getCopilotAuthHeaders: vi.fn().mockResolvedValue({}), + disconnectSessionStream: vi.fn(), + }; +}); + +vi.mock("../useHydrateOnStreamEnd", () => ({ + useHydrateOnStreamEnd: () => undefined, +})); + +function renderStream() { + return renderHook(() => + useCopilotStream({ + sessionId: "sess-1", + hydratedMessages: [], + hasActiveStream: false, + refetchSession: vi.fn().mockResolvedValue({ data: undefined }), + copilotMode: undefined, + copilotModel: undefined, + }), + ); +} + +describe("useCopilotStream — reconnect debounce", () => { + beforeEach(() => { + resetSdkMocks(); + vi.useFakeTimers(); + // Pin Date.now so sinceLastResume math is deterministic. The hook reads + // Date.now() both when stashing lastReconnectResumeAtRef and when + // deciding whether to debounce. + vi.setSystemTime(new Date(2025, 0, 1, 12, 0, 0)); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("coalesces a burst of disconnect events into one resumeStream call", async () => { + renderStream(); + + // First disconnect — schedules a reconnect at the exponential backoff + // delay (1000ms for attempt #1). + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + + // Fire the scheduled timer → resumeStream runs once and stamps + // lastReconnectResumeAtRef.current = Date.now(). + await act(async () => { + await vi.advanceTimersByTimeAsync(1_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(1); + + // A second disconnect arrives immediately after (still inside the + // 1500ms debounce window) — the debounce path must fire and queue a + // coalesced timer, NOT a fresh resume. + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(1); + + // The coalesced timer fires at the window boundary and reschedules a + // real reconnect. Advance past the window AND past the second + // reconnect's backoff (attempt #2 = 2000ms) so resume runs. + await act(async () => { + await vi.advanceTimersByTimeAsync(1_500); + }); + await act(async () => { + await vi.advanceTimersByTimeAsync(2_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(2); + }); + + it("does not debounce a reconnect that arrives after the window closes", async () => { + renderStream(); + + // First reconnect cycle. + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + await act(async () => { + await vi.advanceTimersByTimeAsync(1_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(1); + + // Wait past the debounce window before the next disconnect. + await act(async () => { + await vi.advanceTimersByTimeAsync(2_000); + }); + + // Now a fresh disconnect should go through the normal path (NOT the + // debounce branch) and schedule a backoff of 2000ms (attempt #2). + await act(async () => { + await lastUseChatArgs!.onFinish!({ isDisconnect: true }); + }); + await act(async () => { + await vi.advanceTimersByTimeAsync(2_000); + }); + expect(resumeStreamMock).toHaveBeenCalledTimes(2); + }); +}); diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts index 91e09efde3..7c61390f1f 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.test.ts @@ -7,6 +7,7 @@ import { formatNotificationTitle, getSendSuppressionReason, parseSessionIDs, + shouldDebounceReconnect, shouldSuppressDuplicateSend, } from "./helpers"; @@ -466,3 +467,88 @@ describe("deduplicateMessages", () => { expect(result).toHaveLength(2); // duplicate step-start messages are deduped }); }); + +describe("shouldDebounceReconnect", () => { + const WINDOW_MS = 1_500; + + it("returns null for the first reconnect (lastResumeAt === 0)", () => { + expect(shouldDebounceReconnect(0, 10_000, WINDOW_MS)).toBeNull(); + }); + + it("returns null for a negative lastResumeAt sentinel", () => { + // Defensive: a negative value is still treated as "no reconnect yet". + expect(shouldDebounceReconnect(-1, 10_000, WINDOW_MS)).toBeNull(); + }); + + it("returns the remaining delay when now is inside the window", () => { + // 500ms since the last resume — the caller must wait another 1000ms + // before the storm cap reopens. + const remaining = shouldDebounceReconnect(1_000, 1_500, WINDOW_MS); + expect(remaining).toBe(1_000); + }); + + it("coalesces a reconnect that arrives immediately after the previous resume", () => { + // now === lastResumeAt → sinceLastResume === 0, so the full window remains. + const remaining = shouldDebounceReconnect(5_000, 5_000, WINDOW_MS); + expect(remaining).toBe(WINDOW_MS); + }); + + it("returns null when exactly on the window boundary", () => { + // sinceLastResume === windowMs is NOT inside the window — the next + // reconnect should fire immediately. + expect(shouldDebounceReconnect(1_000, 2_500, WINDOW_MS)).toBeNull(); + }); + + it("returns null when the window has elapsed", () => { + expect(shouldDebounceReconnect(1_000, 5_000, WINDOW_MS)).toBeNull(); + }); + + it("returns a small remaining delay at the far edge of the window", () => { + // 1ms before the window closes → 1ms left. + const remaining = shouldDebounceReconnect(1_000, 2_499, WINDOW_MS); + expect(remaining).toBe(1); + }); + + it("collapses a burst of reconnects into one debounced scheduling", () => { + // Simulates the browser tab-throttle storm: three reconnect calls fire + // within a single second after the last resume. Only the first slot + // would actually run; subsequent calls must always be coalesced. + const lastResumeAt = 10_000; + const firstCallRemaining = shouldDebounceReconnect( + lastResumeAt, + 10_100, + WINDOW_MS, + ); + const secondCallRemaining = shouldDebounceReconnect( + lastResumeAt, + 10_200, + WINDOW_MS, + ); + const thirdCallRemaining = shouldDebounceReconnect( + lastResumeAt, + 10_300, + WINDOW_MS, + ); + expect(firstCallRemaining).toBe(1_400); + expect(secondCallRemaining).toBe(1_300); + expect(thirdCallRemaining).toBe(1_200); + }); + + it("allows a reconnect to fire immediately once the window has passed", () => { + // After the window expires, a retry that came in earlier can now fire + // rather than stalling the loop. Guards against the regression that + // motivated the coalesce-instead-of-drop fix. + const lastResumeAt = 10_000; + expect( + shouldDebounceReconnect(lastResumeAt, 10_500, WINDOW_MS), + ).not.toBeNull(); + expect(shouldDebounceReconnect(lastResumeAt, 11_500, WINDOW_MS)).toBeNull(); + }); + + it("honours a custom windowMs value", () => { + // Shouldn't hard-code 1500 anywhere: the helper is generic over the + // window. + expect(shouldDebounceReconnect(1_000, 1_500, 2_000)).toBe(1_500); + expect(shouldDebounceReconnect(1_000, 3_500, 2_000)).toBeNull(); + }); +}); diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts index b1d87a25d2..131a721117 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/helpers.ts @@ -184,6 +184,28 @@ export function disconnectSessionStream(sessionId: string): void { deleteV2DisconnectSessionStream(sessionId).catch(() => {}); } +/** + * Decide whether a reconnect request must be coalesced onto the debounce + * window boundary, rather than firing immediately. + * + * Returns the remaining milliseconds until the window closes (so the caller + * can schedule a `setTimeout` for that delay) when the previous resume + * happened inside the window, or `null` to let the reconnect proceed now. + * + * `lastResumeAt === 0` signals "no reconnect has fired yet in this session" + * — the first reconnect always passes through regardless of `now`. + */ +export function shouldDebounceReconnect( + lastResumeAt: number, + now: number, + windowMs: number, +): number | null { + if (lastResumeAt <= 0) return null; + const sinceLastResume = now - lastResumeAt; + if (sinceLastResume >= windowMs) return null; + return windowMs - sinceLastResume; +} + /** * Deduplicate messages by ID and by consecutive content fingerprint. * diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts index 2412ff5988..afef20c85a 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/useCopilotStream.ts @@ -18,6 +18,7 @@ import { resolveInProgressTools, getSendSuppressionReason, disconnectSessionStream, + shouldDebounceReconnect, } from "./helpers"; import type { CopilotLlmModel, CopilotMode } from "./store"; import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd"; @@ -25,6 +26,19 @@ import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd"; const RECONNECT_BASE_DELAY_MS = 1_000; const RECONNECT_MAX_ATTEMPTS = 3; +/** + * Minimum spacing between successive reconnect attempts. + * `isReconnectScheduledRef` already prevents OVERLAPPING reconnects, but + * tab-throttle / visibility wake bursts can fire `onFinish(isDisconnect)` + * several times inside a single second — each one would schedule a fresh + * reconnect the moment the previous timer cleared the ref. Requests that + * arrive inside this window since the last reconnect's resume are COALESCED: + * scheduled to run at the window boundary rather than dropped, so a + * fast-failing resume (e.g. a 502 on GET /stream that trips `onError` inside + * 500 ms) still retries instead of stalling the retry loop. + */ +const RECONNECT_DEBOUNCE_MS = 1_500; + /** Minimum time the page must have been hidden to trigger a wake re-sync. */ const WAKE_RESYNC_THRESHOLD_MS = 30_000; @@ -110,6 +124,11 @@ export function useCopilotStream({ const isReconnectScheduledRef = useRef(false); const [isReconnectScheduled, setIsReconnectScheduled] = useState(false); const reconnectTimerRef = useRef>(); + // Timestamp of the last reconnect's actual resume call — used together + // with RECONNECT_DEBOUNCE_MS to drop rapid duplicate reconnect requests + // (e.g. visibility throttle firing onFinish(isDisconnect) several times + // in the same second). 0 = no reconnect has fired yet in this session. + const lastReconnectResumeAtRef = useRef(0); const hasShownDisconnectToast = useRef(false); // Set when the user explicitly clicks stop — prevents onError from // triggering a reconnect cycle for the resulting AbortError. @@ -127,6 +146,32 @@ export function useCopilotStream({ function handleReconnect(sid: string) { if (isReconnectScheduledRef.current || !sid) return; + // Debounce: if the previous reconnect resumed within the last + // RECONNECT_DEBOUNCE_MS, COALESCE this request onto the window boundary + // rather than dropping it. Browser tab-throttle bursts can fire + // onFinish(isDisconnect) 2–3 times in a second; without the debounce, + // each fires its own GET /stream, each one replays the Redis stream, + // and the flicker storm is back. Dropping the request silently (the + // previous behaviour) stalled the retry loop when a resume failed + // quickly — e.g. a 502 on GET /stream that trips onError inside 500 ms + // while the 1500 ms window is still open. Scheduling the retry for + // the remaining window preserves both the storm cap and the retry. + const remainingDelay = shouldDebounceReconnect( + lastReconnectResumeAtRef.current, + Date.now(), + RECONNECT_DEBOUNCE_MS, + ); + if (remainingDelay !== null) { + isReconnectScheduledRef.current = true; + setIsReconnectScheduled(true); + reconnectTimerRef.current = setTimeout(() => { + isReconnectScheduledRef.current = false; + setIsReconnectScheduled(false); + handleReconnect(sid); + }, remainingDelay); + return; + } + const nextAttempt = reconnectAttemptsRef.current + 1; if (nextAttempt > RECONNECT_MAX_ATTEMPTS) { setReconnectExhausted(true); @@ -163,6 +208,7 @@ export function useCopilotStream({ } return prev; }); + lastReconnectResumeAtRef.current = Date.now(); resumeStreamRef.current(); }, delay); } @@ -469,6 +515,7 @@ export function useCopilotStream({ setRateLimitMessage(null); hasShownDisconnectToast.current = false; lastSubmittedMsgRef.current = null; + lastReconnectResumeAtRef.current = 0; setReconnectExhausted(false); setIsSyncing(false); hasResumedRef.current.clear();