feat(platform/copilot): reasoning UI render flag + SSE reconnect storm mitigations

Follow-up to #12871 addressing streaming UX issues observed during Kimi K2.6
rollout testing. Three independent changes:

1. New ChatConfig.render_reasoning_in_ui (default True) gates the
   StreamReasoning* wire events on BOTH baseline and SDK paths. When False
   the frontend sees a text-only stream; the model still reasons, tokens
   are still billed, and role="reasoning" rows are still persisted to
   session.messages so a future per-session toggle can surface them on
   reload.

2. SSE reconnect replay cap: ChatConfig.stream_replay_count defaults to 200
   (was hard-coded count=1000 in subscribe_to_session.xread). Bounds the
   replay storm when a tab-switch / browser-throttle fires multiple
   reconnects. 200 still covers a full Kimi turn after coalescing
   (~150 events).

3. Frontend reconnect debounce: handleReconnect rejects requests that
   arrive within 1500ms of the last reconnect's resume, so visibility-
   throttle bursts collapse to one GET /stream instead of 2-3.

Scope 3 from the brief (Last-Event-ID SSE resume) is deferred to a second
follow-up — threading the redis stream id through every to_sse() and
swapping the fetch-based DefaultChatTransport for a Last-Event-ID-aware
client is a deeper architectural change than scopes 1+2 combined.

Tests: 1140 copilot backend tests pass. New coverage: BaselineReasoningEmitter
render_in_ui flag (wire suppression + persistence preservation), SDKResponseAdapter
render_reasoning_in_ui flag, ChatConfig regression tests for both new fields.
This commit is contained in:
majdyz
2026-04-21 23:49:15 +07:00
parent 6efbc59fd8
commit 35e92e00ca
11 changed files with 311 additions and 18 deletions

View File

@@ -172,16 +172,28 @@ class BaselineReasoningEmitter:
fresh ``ChatMessage(role="reasoning")`` is appended and mutated
in-place as further deltas arrive; :meth:`close` drops the reference
but leaves the appended row intact.
Pass ``render_in_ui=False`` to suppress the ``StreamReasoning*`` wire
events — the emitter still reads the reasoning payload and still
appends / mutates the persisted ``role="reasoning"`` row, but returns
empty event lists from :meth:`on_delta` and :meth:`close`. This lets
the operator (via :attr:`ChatConfig.render_reasoning_in_ui`) hide the
reasoning collapse from the live stream without dropping the audit
trail; a future per-session toggle can surface the persisted rows on
reload.
"""
def __init__(
self,
session_messages: list[ChatMessage] | None = None,
*,
render_in_ui: bool = True,
) -> None:
self._block_id: str = str(uuid.uuid4())
self._open: bool = False
self._session_messages = session_messages
self._current_row: ChatMessage | None = None
self._render_in_ui = render_in_ui
@property
def is_open(self) -> bool:
@@ -195,6 +207,11 @@ class BaselineReasoningEmitter:
Persistence (when a session message list is attached) happens in
lockstep with emission so the row's content stays equal to the
concatenated deltas at every delta boundary.
When ``render_in_ui=False`` the wire events are suppressed (empty
list returned) but the state machine still advances and the
persistence row is still appended / mutated, so ``close`` behaves
symmetrically and the persisted audit trail is identical.
"""
ext = OpenRouterDeltaExtension.from_delta(delta)
text = ext.visible_text()
@@ -202,12 +219,14 @@ class BaselineReasoningEmitter:
return []
events: list[StreamBaseResponse] = []
if not self._open:
events.append(StreamReasoningStart(id=self._block_id))
if self._render_in_ui:
events.append(StreamReasoningStart(id=self._block_id))
self._open = True
if self._session_messages is not None:
self._current_row = ChatMessage(role="reasoning", content="")
self._session_messages.append(self._current_row)
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
if self._render_in_ui:
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
if self._current_row is not None:
self._current_row.content = (self._current_row.content or "") + text
return events
@@ -220,11 +239,18 @@ class BaselineReasoningEmitter:
than reusing one already closed on the wire. The persisted row is
not removed — it stays in ``session_messages`` as the durable
record of what was reasoned.
When ``render_in_ui=False`` the wire event is suppressed (empty
list returned) but the block still rotates so subsequent reasoning
uses a fresh id — keeps the state machine in lockstep with the
render-on branch.
"""
if not self._open:
return []
event = StreamReasoningEnd(id=self._block_id)
events: list[StreamBaseResponse] = (
[StreamReasoningEnd(id=self._block_id)] if self._render_in_ui else []
)
self._open = False
self._block_id = str(uuid.uuid4())
self._current_row = None
return [event]
return events

View File

@@ -279,3 +279,56 @@ class TestReasoningPersistence:
events = emitter.on_delta(_delta(reasoning="pure wire"))
assert len(events) == 2 # start + delta, no crash
# Nothing else to assert — just proves None session is supported.
class TestBaselineReasoningEmitterRenderFlag:
"""``render_in_ui=False`` must silence ``StreamReasoning*`` wire events
while keeping persistence intact — the operator can hide the collapse
without losing the audit trail. These tests pin the contract in both
directions so future refactors can't flip only one half."""
def test_render_off_suppresses_start_and_delta(self):
emitter = BaselineReasoningEmitter(render_in_ui=False)
events = emitter.on_delta(_delta(reasoning="hidden"))
# No wire events, but state advanced (is_open == True) so close()
# below has something to rotate.
assert events == []
assert emitter.is_open is True
def test_render_off_suppresses_close_end(self):
emitter = BaselineReasoningEmitter(render_in_ui=False)
emitter.on_delta(_delta(reasoning="hidden"))
events = emitter.close()
assert events == []
assert emitter.is_open is False
def test_render_off_still_persists_rows(self):
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(session, render_in_ui=False)
emitter.on_delta(_delta(reasoning="part one "))
emitter.on_delta(_delta(reasoning="part two"))
emitter.close()
assert len(session) == 1
assert session[0].role == "reasoning"
assert session[0].content == "part one part two"
def test_render_off_rotates_block_id_between_sessions(self):
"""Even with wire events silenced the block id must rotate on close,
otherwise a hypothetical mid-session flip would reuse a stale id."""
emitter = BaselineReasoningEmitter(render_in_ui=False)
emitter.on_delta(_delta(reasoning="first"))
first_block_id = emitter._block_id
emitter.close()
emitter.on_delta(_delta(reasoning="second"))
assert emitter._block_id != first_block_id
def test_render_on_is_default(self):
"""Defaulting to True preserves backward compat — existing callers
that don't pass the kwarg keep emitting wire events as before."""
emitter = BaselineReasoningEmitter()
events = emitter.on_delta(_delta(reasoning="hello"))
assert len(events) == 2
assert isinstance(events[0], StreamReasoningStart)
assert isinstance(events[1], StreamReasoningDelta)

View File

@@ -379,7 +379,13 @@ class _BaselineStreamState:
# frontend's ``convertChatSessionToUiMessages`` relies on these
# rows to render the Reasoning collapse after the AI SDK's
# stream-end hydrate swaps in the DB-backed message list.
self.reasoning_emitter = BaselineReasoningEmitter(self.session_messages)
# ``render_in_ui`` is sourced from ``config.render_reasoning_in_ui``
# so the operator can silence the reasoning collapse globally
# without dropping the persisted audit trail.
self.reasoning_emitter = BaselineReasoningEmitter(
self.session_messages,
render_in_ui=config.render_reasoning_in_ui,
)
def _is_anthropic_model(model: str) -> bool:

View File

@@ -205,6 +205,33 @@ class ChatConfig(BaseSettings):
"``max_thinking_tokens`` kwarg so the CLI falls back to model default "
"(which, without the flag, leaves extended thinking off).",
)
render_reasoning_in_ui: bool = Field(
default=True,
description="Render extended-thinking reasoning as live UI parts "
"(``reasoning-start``/``reasoning-delta``/``reasoning-end``) on the "
"wire. When False, the baseline emitter and the SDK response adapter "
"suppress the three reasoning wire events — the model still reasons "
"and tokens are still billed, but the frontend sees a text-only "
"stream. Reasoning rows are still persisted to ``session.messages`` "
"as ``role='reasoning'`` entries, so a future per-session toggle can "
"surface them on reload. Flip to False to silence the reasoning "
"collapse without dropping the persisted audit trail.",
)
stream_replay_count: int = Field(
default=200,
ge=1,
le=10000,
description="Maximum number of Redis stream entries replayed on SSE "
"reconnect in :func:`stream_registry.subscribe_to_session`. Lowered "
"from 1000 to bound the replay storm when a tab-switch / throttle "
"triggers multiple quick reconnects — 200 still covers a full Kimi "
"turn after coalescing (~150 events) and leaves headroom for long "
"Opus tool loops. Redis ``XREAD`` ``count`` caps the *response size*, "
"not the total events that existed; dropping older entries on replay "
"is acceptable because the frontend deduplicates on block ids. "
"Scoped to the SSE resume path only — live stream listener XREAD "
"(no replay) is unchanged.",
)
claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = (
Field(
default=None,

View File

@@ -19,6 +19,8 @@ _ENV_VARS_TO_CLEAR = (
"OPENAI_BASE_URL",
"CHAT_CLAUDE_AGENT_CLI_PATH",
"CLAUDE_AGENT_CLI_PATH",
"CHAT_RENDER_REASONING_IN_UI",
"CHAT_STREAM_REPLAY_COUNT",
)
@@ -164,3 +166,38 @@ class TestClaudeAgentCliPathEnvFallback:
monkeypatch.setenv("CLAUDE_AGENT_CLI_PATH", str(tmp_path))
with pytest.raises(Exception, match="not a regular file"):
ChatConfig()
class TestRenderReasoningInUi:
"""``render_reasoning_in_ui`` gates reasoning wire events globally."""
def test_defaults_to_true(self):
"""Default must stay True — flipping it silences the reasoning
collapse for every user, which is an opt-in operator decision."""
cfg = ChatConfig()
assert cfg.render_reasoning_in_ui is True
def test_env_override_false(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CHAT_RENDER_REASONING_IN_UI", "false")
cfg = ChatConfig()
assert cfg.render_reasoning_in_ui is False
class TestStreamReplayCount:
"""``stream_replay_count`` caps the SSE reconnect replay batch size."""
def test_default_is_200(self):
"""200 covers a full Kimi turn after coalescing (~150 events) while
bounding the replay storm from 1000+ chunks."""
cfg = ChatConfig()
assert cfg.stream_replay_count == 200
def test_env_override(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CHAT_STREAM_REPLAY_COUNT", "500")
cfg = ChatConfig()
assert cfg.stream_replay_count == 500
def test_zero_rejected(self):
"""count=0 would make XREAD replay nothing — rejected via ge=1."""
with pytest.raises(Exception):
ChatConfig(stream_replay_count=0)

View File

@@ -733,7 +733,11 @@ class TestDoTransientBackoff:
async for _ in _do_transient_backoff(3, state, "msg-1", "sess-1"):
pass
mock_cls.assert_called_once_with(message_id="msg-1", session_id="sess-1")
mock_cls.assert_called_once_with(
message_id="msg-1",
session_id="sess-1",
render_reasoning_in_ui=True,
)
assert state.adapter is new_adapter
async def test_resets_usage_after_yield(self):

View File

@@ -53,7 +53,13 @@ class SDKResponseAdapter:
text blocks, tool calls, and message lifecycle.
"""
def __init__(self, message_id: str | None = None, session_id: str | None = None):
def __init__(
self,
message_id: str | None = None,
session_id: str | None = None,
*,
render_reasoning_in_ui: bool = True,
):
self.message_id = message_id or str(uuid.uuid4())
self.session_id = session_id
self.text_block_id = str(uuid.uuid4())
@@ -62,6 +68,13 @@ class SDKResponseAdapter:
self.reasoning_block_id = str(uuid.uuid4())
self.has_started_reasoning = False
self.has_ended_reasoning = True
# When False, ``_ensure_reasoning_started`` / ``StreamReasoningDelta``
# emission / ``_end_reasoning_if_open`` become no-ops so the frontend
# sees a text-only stream for a ``ThinkingBlock``. The block is
# still persisted to ``session.messages`` via
# ``_format_sdk_content_blocks`` — same rule as the baseline
# emitter: operator silences the live collapse, audit trail stays.
self._render_reasoning_in_ui = render_reasoning_in_ui
self.current_tool_calls: dict[str, dict[str, str]] = {}
self.resolved_tool_calls: set[str] = set()
self.step_open = False
@@ -142,15 +155,22 @@ class SDKResponseAdapter:
# it live, extended_thinking turns that end
# thinking-only left the UI stuck on "Thought for Xs"
# with nothing rendered until a page refresh.
#
# When ``render_reasoning_in_ui=False`` the three
# reasoning helpers below (and the append) no-op, so
# the frontend sees a text-only stream. Persistence
# of the thinking text (``_format_sdk_content_blocks``)
# is unaffected.
if block.thinking:
self._end_text_if_open(responses)
self._ensure_reasoning_started(responses)
responses.append(
StreamReasoningDelta(
id=self.reasoning_block_id,
delta=block.thinking,
if self._render_reasoning_in_ui:
responses.append(
StreamReasoningDelta(
id=self.reasoning_block_id,
delta=block.thinking,
)
)
)
elif isinstance(block, ToolUseBlock):
self._end_text_if_open(responses)
@@ -349,7 +369,13 @@ class SDKResponseAdapter:
Each ``ThinkingBlock`` the SDK emits gets its own streaming block
on the wire so the frontend can render a new ``Reasoning`` part
per LLM turn (rather than concatenating across the whole session).
No-op when ``render_reasoning_in_ui=False`` — callers still drive
the method on every ``ThinkingBlock`` so persistence stays in
lockstep, but nothing reaches the wire.
"""
if not self._render_reasoning_in_ui:
return
if not self.has_started_reasoning or self.has_ended_reasoning:
if self.has_ended_reasoning:
self.reasoning_block_id = str(uuid.uuid4())
@@ -358,7 +384,13 @@ class SDKResponseAdapter:
self.has_started_reasoning = True
def _end_reasoning_if_open(self, responses: list[StreamBaseResponse]) -> None:
"""End the current reasoning block if one is open."""
"""End the current reasoning block if one is open.
No-op when ``render_reasoning_in_ui=False`` — no start was emitted,
so no end is needed.
"""
if not self._render_reasoning_in_ui:
return
if self.has_started_reasoning and not self.has_ended_reasoning:
responses.append(StreamReasoningEnd(id=self.reasoning_block_id))
self.has_ended_reasoning = True

View File

@@ -331,6 +331,64 @@ def test_empty_thinking_block_is_ignored():
assert [type(r).__name__ for r in results] == ["StreamStartStep"]
def test_render_reasoning_in_ui_false_suppresses_thinking_events():
"""``render_reasoning_in_ui=False`` silences ``StreamReasoning*`` on
the wire — the frontend sees a text-only stream. Persistence via
``_format_sdk_content_blocks`` is handled elsewhere; this test only
pins the wire contract.
"""
adapter = SDKResponseAdapter(
message_id="m",
session_id="s",
render_reasoning_in_ui=False,
)
msg = AssistantMessage(
content=[ThinkingBlock(thinking="plan", signature="sig")],
model="test",
)
results = adapter.convert_message(msg)
types = [type(r).__name__ for r in results]
assert "StreamReasoningStart" not in types
assert "StreamReasoningDelta" not in types
assert "StreamReasoningEnd" not in types
def test_render_reasoning_off_text_after_thinking_emits_no_reasoning_end():
"""With rendering off the ReasoningEnd is never synthesized when text
follows — no ReasoningStart ever hit the wire, so no close is due."""
adapter = SDKResponseAdapter(
message_id="m",
session_id="s",
render_reasoning_in_ui=False,
)
adapter.convert_message(
AssistantMessage(
content=[ThinkingBlock(thinking="warming up", signature="sig")],
model="test",
)
)
results = adapter.convert_message(
AssistantMessage(content=[TextBlock(text="hello")], model="test")
)
types = [type(r).__name__ for r in results]
assert "StreamReasoningEnd" not in types
assert "StreamTextStart" in types
assert "StreamTextDelta" in types
def test_render_reasoning_on_is_default():
"""Default is True — existing callers keep emitting reasoning events."""
adapter = SDKResponseAdapter(message_id="m", session_id="s")
msg = AssistantMessage(
content=[ThinkingBlock(thinking="plan", signature="sig")],
model="test",
)
results = adapter.convert_message(msg)
types = [type(r).__name__ for r in results]
assert "StreamReasoningStart" in types
assert "StreamReasoningDelta" in types
def test_result_success_synthesizes_fallback_text_when_final_turn_is_thinking_only():
"""If the model's last LLM call after a tool_result produced only a
ThinkingBlock (no TextBlock), the UI would hang on the tool output

View File

@@ -821,7 +821,11 @@ async def _do_transient_backoff(
"""
yield StreamStatus(message=f"Connection interrupted, retrying in {backoff}s…")
await asyncio.sleep(backoff)
state.adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id)
state.adapter = SDKResponseAdapter(
message_id=message_id,
session_id=session_id,
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
state.usage.reset()
@@ -3155,7 +3159,11 @@ async def stream_chat_completion_sdk(
options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] # dynamic kwargs
adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id)
adapter = SDKResponseAdapter(
message_id=message_id,
session_id=session_id,
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
# Propagate user_id/session_id as OTEL context attributes so the
# langsmith tracing integration attaches them to every span. This
@@ -3489,7 +3497,9 @@ async def stream_chat_completion_sdk(
session, user_id, is_user_message, state.query_message
)
state.adapter = SDKResponseAdapter(
message_id=message_id, session_id=session_id
message_id=message_id,
session_id=session_id,
render_reasoning_in_ui=config.render_reasoning_in_ui,
)
# Reset token accumulators so a failed attempt's partial
# usage is not double-counted in the successful attempt.

View File

@@ -485,9 +485,17 @@ async def subscribe_to_session(
subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue()
stream_key = _get_turn_stream_key(session.turn_id)
# Step 1: Replay messages from Redis Stream
# Step 1: Replay messages from Redis Stream.
# ``count`` caps the replay batch size — lowered from a hard-coded 1000
# to :attr:`ChatConfig.stream_replay_count` (default 200) to bound the
# replay storm when a tab-switch / browser throttle triggers multiple
# reconnects in quick succession. 200 still covers a full Kimi turn
# after coalescing (~150 events); dropping older entries on replay is
# acceptable because the frontend deduplicates on block ids.
xread_start = time.perf_counter()
messages = await redis.xread({stream_key: last_message_id}, block=None, count=1000)
messages = await redis.xread(
{stream_key: last_message_id}, block=None, count=config.stream_replay_count
)
xread_time = (time.perf_counter() - xread_start) * 1000
logger.info(
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={session_status}",

View File

@@ -25,6 +25,18 @@ import { useHydrateOnStreamEnd } from "./useHydrateOnStreamEnd";
const RECONNECT_BASE_DELAY_MS = 1_000;
const RECONNECT_MAX_ATTEMPTS = 3;
/**
* Minimum spacing between successive reconnect attempts.
* `isReconnectScheduledRef` already prevents OVERLAPPING reconnects, but
* tab-throttle / visibility wake bursts can fire `onFinish(isDisconnect)`
* several times inside a single second — each one would schedule a fresh
* reconnect the moment the previous timer cleared the ref. The debounce
* rejects any reconnect request that arrives within this window since the
* last one's resume actually ran, so a throttle storm collapses to one
* GET /stream instead of three.
*/
const RECONNECT_DEBOUNCE_MS = 1_500;
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
@@ -110,6 +122,11 @@ export function useCopilotStream({
const isReconnectScheduledRef = useRef(false);
const [isReconnectScheduled, setIsReconnectScheduled] = useState(false);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
// Timestamp of the last reconnect's actual resume call — used together
// with RECONNECT_DEBOUNCE_MS to drop rapid duplicate reconnect requests
// (e.g. visibility throttle firing onFinish(isDisconnect) several times
// in the same second). 0 = no reconnect has fired yet in this session.
const lastReconnectResumeAtRef = useRef(0);
const hasShownDisconnectToast = useRef(false);
// Set when the user explicitly clicks stop — prevents onError from
// triggering a reconnect cycle for the resulting AbortError.
@@ -127,6 +144,19 @@ export function useCopilotStream({
function handleReconnect(sid: string) {
if (isReconnectScheduledRef.current || !sid) return;
// Debounce: if the previous reconnect resumed within the last
// RECONNECT_DEBOUNCE_MS, drop this request. Browser tab-throttle bursts
// can fire onFinish(isDisconnect) 23 times in a second; without this
// guard each fires its own GET /stream, each one replays the Redis
// stream, and the flicker storm is back.
const sinceLastResume = Date.now() - lastReconnectResumeAtRef.current;
if (
lastReconnectResumeAtRef.current > 0 &&
sinceLastResume < RECONNECT_DEBOUNCE_MS
) {
return;
}
const nextAttempt = reconnectAttemptsRef.current + 1;
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
setReconnectExhausted(true);
@@ -163,6 +193,7 @@ export function useCopilotStream({
}
return prev;
});
lastReconnectResumeAtRef.current = Date.now();
resumeStreamRef.current();
}, delay);
}
@@ -469,6 +500,7 @@ export function useCopilotStream({
setRateLimitMessage(null);
hasShownDisconnectToast.current = false;
lastSubmittedMsgRef.current = null;
lastReconnectResumeAtRef.current = 0;
setReconnectExhausted(false);
setIsSyncing(false);
hasResumedRef.current.clear();