merge: combine #12871 (Kimi + coalescing + web_search) with #12873 (render flag + replay cap)

This commit is contained in:
majdyz
2026-04-22 07:31:58 +07:00
14 changed files with 592 additions and 36 deletions

View File

@@ -242,6 +242,9 @@ class BaselineReasoningEmitter:
fresh ``ChatMessage(role="reasoning")`` is appended and mutated
in-place as further deltas arrive; :meth:`close` drops the reference
but leaves the appended row intact.
``render_in_ui=False`` suppresses wire events + persistence row;
state machine still advances.
"""
def __init__(
@@ -250,21 +253,20 @@ class BaselineReasoningEmitter:
*,
coalesce_min_chars: int = _COALESCE_MIN_CHARS,
coalesce_max_interval_ms: float = _COALESCE_MAX_INTERVAL_MS,
render_in_ui: bool = True,
) -> None:
self._block_id: str = str(uuid.uuid4())
self._open: bool = False
self._session_messages = session_messages
self._current_row: ChatMessage | None = None
# Coalescing state — ``_pending_delta`` accumulates reasoning text
# between wire flushes. Providers like Kimi K2.6 emit very fine-
# grained chunks; batching them reduces Redis ``xadd`` + SSE + React
# re-render load by ~100x for equivalent text output. Tuning knobs
# are kwargs so tests can disable coalescing (``=0``) for
# deterministic event assertions.
# between wire flushes. Tuning knobs are kwargs so tests can
# disable coalescing (``=0``) for deterministic event assertions.
self._coalesce_min_chars = coalesce_min_chars
self._coalesce_max_interval_ms = coalesce_max_interval_ms
self._pending_delta: str = ""
self._last_flush_monotonic: float = 0.0
self._render_in_ui = render_in_ui
@property
def is_open(self) -> bool:
@@ -296,26 +298,25 @@ class BaselineReasoningEmitter:
# syscalls off the hot path without changing semantics.
now = time.monotonic()
if not self._open:
events.append(StreamReasoningStart(id=self._block_id))
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
if self._render_in_ui:
events.append(StreamReasoningStart(id=self._block_id))
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
self._open = True
self._last_flush_monotonic = now
if self._session_messages is not None:
if self._render_in_ui and self._session_messages is not None:
self._current_row = ChatMessage(role="reasoning", content=text)
self._session_messages.append(self._current_row)
return events
# Persist per-delta (no coalescing here — the session snapshot stays
# consistent at every chunk boundary, independent of the wire
# coalesce window).
if self._current_row is not None:
self._current_row.content = (self._current_row.content or "") + text
self._pending_delta += text
if self._should_flush_pending(now):
events.append(
StreamReasoningDelta(id=self._block_id, delta=self._pending_delta)
)
if self._render_in_ui:
events.append(
StreamReasoningDelta(id=self._block_id, delta=self._pending_delta)
)
self._pending_delta = ""
self._last_flush_monotonic = now
return events
@@ -348,12 +349,13 @@ class BaselineReasoningEmitter:
if not self._open:
return []
events: list[StreamBaseResponse] = []
if self._pending_delta:
events.append(
StreamReasoningDelta(id=self._block_id, delta=self._pending_delta)
)
self._pending_delta = ""
events.append(StreamReasoningEnd(id=self._block_id))
if self._render_in_ui:
if self._pending_delta:
events.append(
StreamReasoningDelta(id=self._block_id, delta=self._pending_delta)
)
events.append(StreamReasoningEnd(id=self._block_id))
self._pending_delta = ""
self._open = False
self._block_id = str(uuid.uuid4())
self._current_row = None

View File

@@ -452,3 +452,60 @@ class TestReasoningPersistence:
events = emitter.on_delta(_delta(reasoning="pure wire"))
assert len(events) == 2 # start + delta, no crash
# Nothing else to assert — just proves None session is supported.
class TestBaselineReasoningEmitterRenderFlag:
"""``render_in_ui=False`` must silence ``StreamReasoning*`` wire events
AND drop persistence of ``role="reasoning"`` rows — the operator hides
the collapse on both the live wire and on reload. Persistence is tied
to the wire events because the frontend's hydration path unconditionally
re-renders persisted reasoning rows; keeping them would make the flag a
no-op post-reload. These tests pin the contract in both directions so
future refactors can't flip only one half."""
def test_render_off_suppresses_start_and_delta(self):
emitter = BaselineReasoningEmitter(render_in_ui=False)
events = emitter.on_delta(_delta(reasoning="hidden"))
# No wire events, but state advanced (is_open == True) so close()
# below has something to rotate.
assert events == []
assert emitter.is_open is True
def test_render_off_suppresses_close_end(self):
emitter = BaselineReasoningEmitter(render_in_ui=False)
emitter.on_delta(_delta(reasoning="hidden"))
events = emitter.close()
assert events == []
assert emitter.is_open is False
def test_render_off_skips_persistence(self):
"""When render is off the emitter must NOT append a ``role="reasoning"``
row to ``session_messages`` — hydration would re-render it, undoing
the operator's intent."""
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(session, render_in_ui=False)
emitter.on_delta(_delta(reasoning="part one "))
emitter.on_delta(_delta(reasoning="part two"))
emitter.close()
assert session == []
def test_render_off_rotates_block_id_between_sessions(self):
"""Even with wire events silenced the block id must rotate on close,
otherwise a hypothetical mid-session flip would reuse a stale id."""
emitter = BaselineReasoningEmitter(render_in_ui=False)
emitter.on_delta(_delta(reasoning="first"))
first_block_id = emitter._block_id
emitter.close()
emitter.on_delta(_delta(reasoning="second"))
assert emitter._block_id != first_block_id
def test_render_on_is_default(self):
"""Defaulting to True preserves backward compat — existing callers
that don't pass the kwarg keep emitting wire events as before."""
emitter = BaselineReasoningEmitter()
events = emitter.on_delta(_delta(reasoning="hello"))
assert len(events) == 2
assert isinstance(events[0], StreamReasoningStart)
assert isinstance(events[1], StreamReasoningDelta)

View File

@@ -382,7 +382,13 @@ class _BaselineStreamState:
# frontend's ``convertChatSessionToUiMessages`` relies on these
# rows to render the Reasoning collapse after the AI SDK's
# stream-end hydrate swaps in the DB-backed message list.
self.reasoning_emitter = BaselineReasoningEmitter(self.session_messages)
# ``render_in_ui`` is sourced from ``config.render_reasoning_in_ui``
# so the operator can silence the reasoning collapse globally
# without dropping the persisted audit trail.
self.reasoning_emitter = BaselineReasoningEmitter(
self.session_messages,
render_in_ui=config.render_reasoning_in_ui,
)
def _is_anthropic_model(model: str) -> bool:

View File

@@ -249,6 +249,18 @@ class ChatConfig(BaseSettings):
"``max_thinking_tokens`` kwarg so the CLI falls back to model default "
"(which, without the flag, leaves extended thinking off).",
)
render_reasoning_in_ui: bool = Field(
default=True,
description="Render reasoning as live UI parts + persist "
"``role='reasoning'`` rows. False suppresses both; tokens are still "
"billed upstream.",
)
stream_replay_count: int = Field(
default=200,
ge=1,
le=10000,
description="Max Redis stream entries replayed on SSE reconnect.",
)
claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = (
Field(
default=None,

View File

@@ -19,6 +19,8 @@ _ENV_VARS_TO_CLEAR = (
"OPENAI_BASE_URL",
"CHAT_CLAUDE_AGENT_CLI_PATH",
"CLAUDE_AGENT_CLI_PATH",
"CHAT_RENDER_REASONING_IN_UI",
"CHAT_STREAM_REPLAY_COUNT",
)
@@ -164,3 +166,38 @@ class TestClaudeAgentCliPathEnvFallback:
monkeypatch.setenv("CLAUDE_AGENT_CLI_PATH", str(tmp_path))
with pytest.raises(Exception, match="not a regular file"):
ChatConfig()
class TestRenderReasoningInUi:
"""``render_reasoning_in_ui`` gates reasoning wire events globally."""
def test_defaults_to_true(self):
"""Default must stay True — flipping it silences the reasoning
collapse for every user, which is an opt-in operator decision."""
cfg = ChatConfig()
assert cfg.render_reasoning_in_ui is True
def test_env_override_false(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CHAT_RENDER_REASONING_IN_UI", "false")
cfg = ChatConfig()
assert cfg.render_reasoning_in_ui is False
class TestStreamReplayCount:
"""``stream_replay_count`` caps the SSE reconnect replay batch size."""
def test_default_is_200(self):
"""200 covers a full Kimi turn after coalescing (~150 events) while
bounding the replay storm from 1000+ chunks."""
cfg = ChatConfig()
assert cfg.stream_replay_count == 200
def test_env_override(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CHAT_STREAM_REPLAY_COUNT", "500")
cfg = ChatConfig()
assert cfg.stream_replay_count == 500
def test_zero_rejected(self):
"""count=0 would make XREAD replay nothing — rejected via ge=1."""
with pytest.raises(Exception):
ChatConfig(stream_replay_count=0)

View File

@@ -714,10 +714,13 @@ class TestDoTransientBackoff:
mock_sleep.assert_called_once_with(7)
async def test_replaces_adapter_with_new_instance(self):
"""state.adapter is replaced with a new SDKResponseAdapter after yield."""
"""state.adapter is replaced with a new SDKResponseAdapter after yield,
and ``render_reasoning_in_ui`` is threaded from the SDK service config
(not hardcoded) so ``CHAT_RENDER_REASONING_IN_UI=false`` at runtime
flips the reconstruction consistently with the rest of the path."""
from unittest.mock import AsyncMock, MagicMock, patch
from backend.copilot.sdk.service import _do_transient_backoff
from backend.copilot.sdk.service import _do_transient_backoff, config
original_adapter = MagicMock()
state = MagicMock()
@@ -733,7 +736,11 @@ class TestDoTransientBackoff:
async for _ in _do_transient_backoff(3, state, "msg-1", "sess-1"):
pass
mock_cls.assert_called_once_with(message_id="msg-1", session_id="sess-1")
mock_cls.assert_called_once_with(
message_id="msg-1",
session_id="sess-1",
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
assert state.adapter is new_adapter
async def test_resets_usage_after_yield(self):

View File

@@ -53,7 +53,13 @@ class SDKResponseAdapter:
text blocks, tool calls, and message lifecycle.
"""
def __init__(self, message_id: str | None = None, session_id: str | None = None):
def __init__(
self,
message_id: str | None = None,
session_id: str | None = None,
*,
render_reasoning_in_ui: bool = True,
):
self.message_id = message_id or str(uuid.uuid4())
self.session_id = session_id
self.text_block_id = str(uuid.uuid4())
@@ -62,6 +68,9 @@ class SDKResponseAdapter:
self.reasoning_block_id = str(uuid.uuid4())
self.has_started_reasoning = False
self.has_ended_reasoning = True
# When False, reasoning wire events + persisted reasoning rows are
# suppressed; transcript continuity is unaffected.
self._render_reasoning_in_ui = render_reasoning_in_ui
self.current_tool_calls: dict[str, dict[str, str]] = {}
self.resolved_tool_calls: set[str] = set()
self.step_open = False
@@ -142,15 +151,27 @@ class SDKResponseAdapter:
# it live, extended_thinking turns that end
# thinking-only left the UI stuck on "Thought for Xs"
# with nothing rendered until a page refresh.
#
# When ``render_reasoning_in_ui=False`` the three
# reasoning helpers below (and the append) no-op, so
# the frontend sees a text-only stream AND no
# ``ChatMessage(role='reasoning')`` row is persisted
# (the row is only created by ``_dispatch_response``
# when ``StreamReasoningStart`` arrives, which is
# suppressed here). Persistence of the thinking text
# into the SDK transcript via
# ``_format_sdk_content_blocks`` is unaffected — that
# feeds ``--resume`` continuity, not the UI.
if block.thinking:
self._end_text_if_open(responses)
self._ensure_reasoning_started(responses)
responses.append(
StreamReasoningDelta(
id=self.reasoning_block_id,
delta=block.thinking,
if self._render_reasoning_in_ui:
responses.append(
StreamReasoningDelta(
id=self.reasoning_block_id,
delta=block.thinking,
)
)
)
elif isinstance(block, ToolUseBlock):
self._end_text_if_open(responses)
@@ -349,7 +370,13 @@ class SDKResponseAdapter:
Each ``ThinkingBlock`` the SDK emits gets its own streaming block
on the wire so the frontend can render a new ``Reasoning`` part
per LLM turn (rather than concatenating across the whole session).
No-op when ``render_reasoning_in_ui=False`` — callers still drive
the method on every ``ThinkingBlock`` so persistence stays in
lockstep, but nothing reaches the wire.
"""
if not self._render_reasoning_in_ui:
return
if not self.has_started_reasoning or self.has_ended_reasoning:
if self.has_ended_reasoning:
self.reasoning_block_id = str(uuid.uuid4())
@@ -358,7 +385,13 @@ class SDKResponseAdapter:
self.has_started_reasoning = True
def _end_reasoning_if_open(self, responses: list[StreamBaseResponse]) -> None:
"""End the current reasoning block if one is open."""
"""End the current reasoning block if one is open.
No-op when ``render_reasoning_in_ui=False`` — no start was emitted,
so no end is needed.
"""
if not self._render_reasoning_in_ui:
return
if self.has_started_reasoning and not self.has_ended_reasoning:
responses.append(StreamReasoningEnd(id=self.reasoning_block_id))
self.has_ended_reasoning = True

View File

@@ -331,6 +331,64 @@ def test_empty_thinking_block_is_ignored():
assert [type(r).__name__ for r in results] == ["StreamStartStep"]
def test_render_reasoning_in_ui_false_suppresses_thinking_events():
"""``render_reasoning_in_ui=False`` silences ``StreamReasoning*`` on
the wire — the frontend sees a text-only stream. Persistence via
``_format_sdk_content_blocks`` is handled elsewhere; this test only
pins the wire contract.
"""
adapter = SDKResponseAdapter(
message_id="m",
session_id="s",
render_reasoning_in_ui=False,
)
msg = AssistantMessage(
content=[ThinkingBlock(thinking="plan", signature="sig")],
model="test",
)
results = adapter.convert_message(msg)
types = [type(r).__name__ for r in results]
assert "StreamReasoningStart" not in types
assert "StreamReasoningDelta" not in types
assert "StreamReasoningEnd" not in types
def test_render_reasoning_off_text_after_thinking_emits_no_reasoning_end():
"""With rendering off the ReasoningEnd is never synthesized when text
follows — no ReasoningStart ever hit the wire, so no close is due."""
adapter = SDKResponseAdapter(
message_id="m",
session_id="s",
render_reasoning_in_ui=False,
)
adapter.convert_message(
AssistantMessage(
content=[ThinkingBlock(thinking="warming up", signature="sig")],
model="test",
)
)
results = adapter.convert_message(
AssistantMessage(content=[TextBlock(text="hello")], model="test")
)
types = [type(r).__name__ for r in results]
assert "StreamReasoningEnd" not in types
assert "StreamTextStart" in types
assert "StreamTextDelta" in types
def test_render_reasoning_on_is_default():
"""Default is True — existing callers keep emitting reasoning events."""
adapter = SDKResponseAdapter(message_id="m", session_id="s")
msg = AssistantMessage(
content=[ThinkingBlock(thinking="plan", signature="sig")],
model="test",
)
results = adapter.convert_message(msg)
types = [type(r).__name__ for r in results]
assert "StreamReasoningStart" in types
assert "StreamReasoningDelta" in types
def test_result_success_synthesizes_fallback_text_when_final_turn_is_thinking_only():
"""If the model's last LLM call after a tool_result produced only a
ThinkingBlock (no TextBlock), the UI would hang on the tool output

View File

@@ -823,7 +823,11 @@ async def _do_transient_backoff(
"""
yield StreamStatus(message=f"Connection interrupted, retrying in {backoff}s…")
await asyncio.sleep(backoff)
state.adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id)
state.adapter = SDKResponseAdapter(
message_id=message_id,
session_id=session_id,
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
state.usage.reset()
@@ -3160,7 +3164,11 @@ async def stream_chat_completion_sdk(
options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] # dynamic kwargs
adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id)
adapter = SDKResponseAdapter(
message_id=message_id,
session_id=session_id,
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
# Propagate user_id/session_id as OTEL context attributes so the
# langsmith tracing integration attaches them to every span. This
@@ -3494,7 +3502,9 @@ async def stream_chat_completion_sdk(
session, user_id, is_user_message, state.query_message
)
state.adapter = SDKResponseAdapter(
message_id=message_id, session_id=session_id
message_id=message_id,
session_id=session_id,
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
# Reset token accumulators so a failed attempt's partial
# usage is not double-counted in the successful attempt.

View File

@@ -485,9 +485,11 @@ async def subscribe_to_session(
subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue()
stream_key = _get_turn_stream_key(session.turn_id)
# Step 1: Replay messages from Redis Stream
# Replay batch capped by ``stream_replay_count``.
xread_start = time.perf_counter()
messages = await redis.xread({stream_key: last_message_id}, block=None, count=1000)
messages = await redis.xread(
{stream_key: last_message_id}, block=None, count=config.stream_replay_count
)
xread_time = (time.perf_counter() - xread_start) * 1000
logger.info(
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={session_status}",

View File

@@ -0,0 +1,177 @@
import { act, renderHook } from "@testing-library/react";
import type { UIMessage } from "ai";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { useCopilotStream } from "../useCopilotStream";
// Capture the args passed to ``useChat`` so tests can invoke onFinish/onError
// directly — that's the only way to drive handleReconnect without a real SSE.
let lastUseChatArgs: {
onFinish?: (args: { isDisconnect?: boolean; isAbort?: boolean }) => void;
onError?: (err: Error) => void;
} | null = null;
const resumeStreamMock = vi.fn();
const sdkStopMock = vi.fn();
const sdkSendMessageMock = vi.fn();
const setMessagesMock = vi.fn();
function resetSdkMocks() {
lastUseChatArgs = null;
resumeStreamMock.mockReset();
sdkStopMock.mockReset();
sdkSendMessageMock.mockReset();
setMessagesMock.mockReset();
}
vi.mock("@ai-sdk/react", () => ({
useChat: (args: unknown) => {
lastUseChatArgs = args as typeof lastUseChatArgs;
return {
messages: [] as UIMessage[],
sendMessage: sdkSendMessageMock,
stop: sdkStopMock,
status: "ready" as const,
error: undefined,
setMessages: setMessagesMock,
resumeStream: resumeStreamMock,
};
},
}));
vi.mock("ai", async () => {
const actual = await vi.importActual<typeof import("ai")>("ai");
return {
...actual,
DefaultChatTransport: class {
constructor(public opts: unknown) {}
},
};
});
vi.mock("@tanstack/react-query", () => ({
useQueryClient: () => ({ invalidateQueries: vi.fn() }),
}));
vi.mock("@/app/api/__generated__/endpoints/chat/chat", () => ({
getGetV2GetCopilotUsageQueryKey: () => ["copilot-usage"],
getGetV2GetSessionQueryKey: (id: string) => ["session", id],
postV2CancelSessionTask: vi.fn(),
deleteV2DisconnectSessionStream: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("@/components/molecules/Toast/use-toast", () => ({
toast: vi.fn(),
}));
vi.mock("@/services/environment", () => ({
environment: {
getAGPTServerBaseUrl: () => "http://localhost",
},
}));
vi.mock("../helpers", async () => {
const actual =
await vi.importActual<typeof import("../helpers")>("../helpers");
return {
...actual,
getCopilotAuthHeaders: vi.fn().mockResolvedValue({}),
disconnectSessionStream: vi.fn(),
};
});
vi.mock("../useHydrateOnStreamEnd", () => ({
useHydrateOnStreamEnd: () => undefined,
}));
function renderStream() {
return renderHook(() =>
useCopilotStream({
sessionId: "sess-1",
hydratedMessages: [],
hasActiveStream: false,
refetchSession: vi.fn().mockResolvedValue({ data: undefined }),
copilotMode: undefined,
copilotModel: undefined,
}),
);
}
describe("useCopilotStream — reconnect debounce", () => {
beforeEach(() => {
resetSdkMocks();
vi.useFakeTimers();
// Pin Date.now so sinceLastResume math is deterministic. The hook reads
// Date.now() both when stashing lastReconnectResumeAtRef and when
// deciding whether to debounce.
vi.setSystemTime(new Date(2025, 0, 1, 12, 0, 0));
});
afterEach(() => {
vi.useRealTimers();
});
it("coalesces a burst of disconnect events into one resumeStream call", async () => {
renderStream();
// First disconnect — schedules a reconnect at the exponential backoff
// delay (1000ms for attempt #1).
await act(async () => {
await lastUseChatArgs!.onFinish!({ isDisconnect: true });
});
// Fire the scheduled timer → resumeStream runs once and stamps
// lastReconnectResumeAtRef.current = Date.now().
await act(async () => {
await vi.advanceTimersByTimeAsync(1_000);
});
expect(resumeStreamMock).toHaveBeenCalledTimes(1);
// A second disconnect arrives immediately after (still inside the
// 1500ms debounce window) — the debounce path must fire and queue a
// coalesced timer, NOT a fresh resume.
await act(async () => {
await lastUseChatArgs!.onFinish!({ isDisconnect: true });
});
expect(resumeStreamMock).toHaveBeenCalledTimes(1);
// The coalesced timer fires at the window boundary and reschedules a
// real reconnect. Advance past the window AND past the second
// reconnect's backoff (attempt #2 = 2000ms) so resume runs.
await act(async () => {
await vi.advanceTimersByTimeAsync(1_500);
});
await act(async () => {
await vi.advanceTimersByTimeAsync(2_000);
});
expect(resumeStreamMock).toHaveBeenCalledTimes(2);
});
it("does not debounce a reconnect that arrives after the window closes", async () => {
renderStream();
// First reconnect cycle.
await act(async () => {
await lastUseChatArgs!.onFinish!({ isDisconnect: true });
});
await act(async () => {
await vi.advanceTimersByTimeAsync(1_000);
});
expect(resumeStreamMock).toHaveBeenCalledTimes(1);
// Wait past the debounce window before the next disconnect.
await act(async () => {
await vi.advanceTimersByTimeAsync(2_000);
});
// Now a fresh disconnect should go through the normal path (NOT the
// debounce branch) and schedule a backoff of 2000ms (attempt #2).
await act(async () => {
await lastUseChatArgs!.onFinish!({ isDisconnect: true });
});
await act(async () => {
await vi.advanceTimersByTimeAsync(2_000);
});
expect(resumeStreamMock).toHaveBeenCalledTimes(2);
});
});

View File

@@ -7,6 +7,7 @@ import {
formatNotificationTitle,
getSendSuppressionReason,
parseSessionIDs,
shouldDebounceReconnect,
shouldSuppressDuplicateSend,
} from "./helpers";
@@ -466,3 +467,88 @@ describe("deduplicateMessages", () => {
expect(result).toHaveLength(2); // duplicate step-start messages are deduped
});
});
describe("shouldDebounceReconnect", () => {
const WINDOW_MS = 1_500;
it("returns null for the first reconnect (lastResumeAt === 0)", () => {
expect(shouldDebounceReconnect(0, 10_000, WINDOW_MS)).toBeNull();
});
it("returns null for a negative lastResumeAt sentinel", () => {
// Defensive: a negative value is still treated as "no reconnect yet".
expect(shouldDebounceReconnect(-1, 10_000, WINDOW_MS)).toBeNull();
});
it("returns the remaining delay when now is inside the window", () => {
// 500ms since the last resume — the caller must wait another 1000ms
// before the storm cap reopens.
const remaining = shouldDebounceReconnect(1_000, 1_500, WINDOW_MS);
expect(remaining).toBe(1_000);
});
it("coalesces a reconnect that arrives immediately after the previous resume", () => {
// now === lastResumeAt → sinceLastResume === 0, so the full window remains.
const remaining = shouldDebounceReconnect(5_000, 5_000, WINDOW_MS);
expect(remaining).toBe(WINDOW_MS);
});
it("returns null when exactly on the window boundary", () => {
// sinceLastResume === windowMs is NOT inside the window — the next
// reconnect should fire immediately.
expect(shouldDebounceReconnect(1_000, 2_500, WINDOW_MS)).toBeNull();
});
it("returns null when the window has elapsed", () => {
expect(shouldDebounceReconnect(1_000, 5_000, WINDOW_MS)).toBeNull();
});
it("returns a small remaining delay at the far edge of the window", () => {
// 1ms before the window closes → 1ms left.
const remaining = shouldDebounceReconnect(1_000, 2_499, WINDOW_MS);
expect(remaining).toBe(1);
});
it("collapses a burst of reconnects into one debounced scheduling", () => {
// Simulates the browser tab-throttle storm: three reconnect calls fire
// within a single second after the last resume. Only the first slot
// would actually run; subsequent calls must always be coalesced.
const lastResumeAt = 10_000;
const firstCallRemaining = shouldDebounceReconnect(
lastResumeAt,
10_100,
WINDOW_MS,
);
const secondCallRemaining = shouldDebounceReconnect(
lastResumeAt,
10_200,
WINDOW_MS,
);
const thirdCallRemaining = shouldDebounceReconnect(
lastResumeAt,
10_300,
WINDOW_MS,
);
expect(firstCallRemaining).toBe(1_400);
expect(secondCallRemaining).toBe(1_300);
expect(thirdCallRemaining).toBe(1_200);
});
it("allows a reconnect to fire immediately once the window has passed", () => {
// After the window expires, a retry that came in earlier can now fire
// rather than stalling the loop. Guards against the regression that
// motivated the coalesce-instead-of-drop fix.
const lastResumeAt = 10_000;
expect(
shouldDebounceReconnect(lastResumeAt, 10_500, WINDOW_MS),
).not.toBeNull();
expect(shouldDebounceReconnect(lastResumeAt, 11_500, WINDOW_MS)).toBeNull();
});
it("honours a custom windowMs value", () => {
// Shouldn't hard-code 1500 anywhere: the helper is generic over the
// window.
expect(shouldDebounceReconnect(1_000, 1_500, 2_000)).toBe(1_500);
expect(shouldDebounceReconnect(1_000, 3_500, 2_000)).toBeNull();
});
});

View File

@@ -184,6 +184,28 @@ export function disconnectSessionStream(sessionId: string): void {
deleteV2DisconnectSessionStream(sessionId).catch(() => {});
}
/**
* Decide whether a reconnect request must be coalesced onto the debounce
* window boundary, rather than firing immediately.
*
* Returns the remaining milliseconds until the window closes (so the caller
* can schedule a `setTimeout` for that delay) when the previous resume
* happened inside the window, or `null` to let the reconnect proceed now.
*
* `lastResumeAt === 0` signals "no reconnect has fired yet in this session"
* — the first reconnect always passes through regardless of `now`.
*/
export function shouldDebounceReconnect(
lastResumeAt: number,
now: number,
windowMs: number,
): number | null {
if (lastResumeAt <= 0) return null;
const sinceLastResume = now - lastResumeAt;
if (sinceLastResume >= windowMs) return null;
return windowMs - sinceLastResume;
}
/**
* Deduplicate messages by ID and by consecutive content fingerprint.
*

View File

@@ -18,6 +18,7 @@ import {
resolveInProgressTools,
getSendSuppressionReason,
disconnectSessionStream,
shouldDebounceReconnect,
} from "./helpers";
import type { CopilotLlmModel, CopilotMode } from "./store";
import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd";
@@ -25,6 +26,19 @@ import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd";
const RECONNECT_BASE_DELAY_MS = 1_000;
const RECONNECT_MAX_ATTEMPTS = 3;
/**
* Minimum spacing between successive reconnect attempts.
* `isReconnectScheduledRef` already prevents OVERLAPPING reconnects, but
* tab-throttle / visibility wake bursts can fire `onFinish(isDisconnect)`
* several times inside a single second — each one would schedule a fresh
* reconnect the moment the previous timer cleared the ref. Requests that
* arrive inside this window since the last reconnect's resume are COALESCED:
* scheduled to run at the window boundary rather than dropped, so a
* fast-failing resume (e.g. a 502 on GET /stream that trips `onError` inside
* 500 ms) still retries instead of stalling the retry loop.
*/
const RECONNECT_DEBOUNCE_MS = 1_500;
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
@@ -110,6 +124,11 @@ export function useCopilotStream({
const isReconnectScheduledRef = useRef(false);
const [isReconnectScheduled, setIsReconnectScheduled] = useState(false);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
// Timestamp of the last reconnect's actual resume call — used together
// with RECONNECT_DEBOUNCE_MS to drop rapid duplicate reconnect requests
// (e.g. visibility throttle firing onFinish(isDisconnect) several times
// in the same second). 0 = no reconnect has fired yet in this session.
const lastReconnectResumeAtRef = useRef(0);
const hasShownDisconnectToast = useRef(false);
// Set when the user explicitly clicks stop — prevents onError from
// triggering a reconnect cycle for the resulting AbortError.
@@ -127,6 +146,32 @@ export function useCopilotStream({
function handleReconnect(sid: string) {
if (isReconnectScheduledRef.current || !sid) return;
// Debounce: if the previous reconnect resumed within the last
// RECONNECT_DEBOUNCE_MS, COALESCE this request onto the window boundary
// rather than dropping it. Browser tab-throttle bursts can fire
// onFinish(isDisconnect) 23 times in a second; without the debounce,
// each fires its own GET /stream, each one replays the Redis stream,
// and the flicker storm is back. Dropping the request silently (the
// previous behaviour) stalled the retry loop when a resume failed
// quickly — e.g. a 502 on GET /stream that trips onError inside 500 ms
// while the 1500 ms window is still open. Scheduling the retry for
// the remaining window preserves both the storm cap and the retry.
const remainingDelay = shouldDebounceReconnect(
lastReconnectResumeAtRef.current,
Date.now(),
RECONNECT_DEBOUNCE_MS,
);
if (remainingDelay !== null) {
isReconnectScheduledRef.current = true;
setIsReconnectScheduled(true);
reconnectTimerRef.current = setTimeout(() => {
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
handleReconnect(sid);
}, remainingDelay);
return;
}
const nextAttempt = reconnectAttemptsRef.current + 1;
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
setReconnectExhausted(true);
@@ -163,6 +208,7 @@ export function useCopilotStream({
}
return prev;
});
lastReconnectResumeAtRef.current = Date.now();
resumeStreamRef.current();
}, delay);
}
@@ -469,6 +515,7 @@ export function useCopilotStream({
setRateLimitMessage(null);
hasShownDisconnectToast.current = false;
lastSubmittedMsgRef.current = null;
lastReconnectResumeAtRef.current = 0;
setReconnectExhausted(false);
setIsSyncing(false);
hasResumedRef.current.clear();