fix(backend/copilot): coalesce reasoning deltas to unfreeze Kimi streams

Observed symptom: copilot page frozen for ~700 s on a session using the new Kimi K2.6 default.  Redis `XLEN chat:stream:...` showed 4,677 reasoning-delta chunks in a single turn vs ~28 for peer Sonnet sessions.  Each chunk was one Redis xadd + one SSE frame + one React re-render of the non-virtualised chat list, which paint-stormed the main thread until the stream ended.

OpenRouter's Kimi endpoint tokenises reasoning at a much finer grain than Anthropic, so the 1:1 chunk→`StreamReasoningDelta` mapping in BaselineReasoningEmitter blew up on the wire while the same code was fine for Sonnet.

Fix: coalesce `StreamReasoningDelta` emissions in the emitter.

* First chunk in a block still emits Start + Delta atomically so the Reasoning collapse renders immediately.

* Subsequent chunks buffer into `_pending_delta` and flush once either the char-size (`_COALESCE_MIN_CHARS=32`) or time (`_COALESCE_MAX_INTERVAL_MS=40`) threshold trips.  `close()` always drains the tail before emitting `StreamReasoningEnd`.

* DB persistence stays per-chunk — `_current_row.content` updates on every delta independent of the coalesce window, so a crash mid-turn still persists the full reasoning-so-far.

* Thresholds are `__init__` kwargs so tests can disable coalescing for deterministic state-machine assertions.

Net effect: ~4,700 → ~150 events per turn (30x), well under the browser's paint-storm threshold; reasoning still appears live at ~25 Hz (40 ms window) which is below human perception.

Pre-existing issues flagged for follow-up (out of scope — the freeze is gone without them):

* `ChatMessagesContainer` has no React.memo per message and no list virtualisation — a very long session still re-renders every prior message on each new chunk.

* `routes.py:1163-1171` replays from `0-0` with `count=1000` on every SSE reconnect (6 reconnects observed), duplicating up to 6,000 chunks.  Proper Last-Event-ID support requires threading Redis stream message IDs through every SSE event + a frontend handshake — material refactor deferred to a dedicated PR.
This commit is contained in:
majdyz
2026-04-21 22:39:04 +07:00
parent fce7a59713
commit da5420fa07
2 changed files with 183 additions and 13 deletions

View File

@@ -23,6 +23,7 @@ This module keeps the wire-level concerns in one place:
from __future__ import annotations
import logging
import time
import uuid
from typing import Any
@@ -42,6 +43,19 @@ logger = logging.getLogger(__name__)
_VISIBLE_REASONING_TYPES = frozenset({"reasoning.text", "reasoning.summary"})
# Coalescing thresholds for ``StreamReasoningDelta`` emission. OpenRouter's
# Kimi K2.6 endpoint tokenises reasoning at a much finer grain than Anthropic
# (~4,700 deltas per turn in one observed session, vs ~28 for Sonnet); without
# coalescing, every chunk is one Redis ``xadd`` + one SSE frame + one React
# re-render of the non-virtualised chat list, which paint-storms the browser
# main thread and freezes the UI. Batching into ~32-char / ~40 ms windows
# cuts the event rate ~100x while staying snappy enough that the Reasoning
# collapse still feels live (well under the ~100 ms perceptual threshold).
# Per-delta persistence to ``session.messages`` stays granular — we only
# coalesce the *wire* emission.
_COALESCE_MIN_CHARS = 32
_COALESCE_MAX_INTERVAL_MS = 40.0
class ReasoningDetail(BaseModel):
"""One entry in OpenRouter's ``reasoning_details`` list.
@@ -195,11 +209,24 @@ class BaselineReasoningEmitter:
def __init__(
self,
session_messages: list[ChatMessage] | None = None,
*,
coalesce_min_chars: int = _COALESCE_MIN_CHARS,
coalesce_max_interval_ms: float = _COALESCE_MAX_INTERVAL_MS,
) -> None:
self._block_id: str = str(uuid.uuid4())
self._open: bool = False
self._session_messages = session_messages
self._current_row: ChatMessage | None = None
# Coalescing state — ``_pending_delta`` accumulates reasoning text
# between wire flushes. Providers like Kimi K2.6 emit very fine-
# grained chunks; batching them reduces Redis ``xadd`` + SSE + React
# re-render load by ~100x for equivalent text output. Tuning knobs
# are kwargs so tests can disable coalescing (``=0``) for
# deterministic event assertions.
self._coalesce_min_chars = coalesce_min_chars
self._coalesce_max_interval_ms = coalesce_max_interval_ms
self._pending_delta: str = ""
self._last_flush_monotonic: float = 0.0
@property
def is_open(self) -> bool:
@@ -210,39 +237,77 @@ class BaselineReasoningEmitter:
Empty list when the chunk carries no reasoning payload, so this is
safe to call on every chunk without guarding at the call site.
Persistence (when a session message list is attached) happens in
lockstep with emission so the row's content stays equal to the
concatenated deltas at every delta boundary.
Persistence (when a session message list is attached) stays
per-delta so the DB row's content always equals the concatenation
of wire deltas at every chunk boundary, independent of the
coalescing window. Only the wire emission is batched.
"""
ext = OpenRouterDeltaExtension.from_delta(delta)
text = ext.visible_text()
if not text:
return []
events: list[StreamBaseResponse] = []
# First reasoning text in this block — emit Start + the first Delta
# atomically so the frontend Reasoning collapse renders immediately
# rather than waiting for the coalesce window to elapse. Subsequent
# chunks buffer into ``_pending_delta`` and only flush when the
# char/time thresholds trip.
if not self._open:
events.append(StreamReasoningStart(id=self._block_id))
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
self._open = True
self._last_flush_monotonic = time.monotonic()
if self._session_messages is not None:
self._current_row = ChatMessage(role="reasoning", content="")
self._current_row = ChatMessage(role="reasoning", content=text)
self._session_messages.append(self._current_row)
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
return events
# Persist per-delta (no coalescing here — the session snapshot stays
# consistent at every chunk boundary, independent of the wire
# coalesce window).
if self._current_row is not None:
self._current_row.content = (self._current_row.content or "") + text
self._pending_delta += text
if self._should_flush_pending():
events.append(
StreamReasoningDelta(id=self._block_id, delta=self._pending_delta)
)
self._pending_delta = ""
self._last_flush_monotonic = time.monotonic()
return events
def _should_flush_pending(self) -> bool:
"""Return True when the accumulated delta should be emitted now."""
if not self._pending_delta:
return False
if len(self._pending_delta) >= self._coalesce_min_chars:
return True
elapsed_ms = (time.monotonic() - self._last_flush_monotonic) * 1000.0
return elapsed_ms >= self._coalesce_max_interval_ms
def close(self) -> list[StreamBaseResponse]:
"""Emit ``StreamReasoningEnd`` for the open block (if any) and rotate.
Idempotent — returns ``[]`` when no block is open. The id rotation
guarantees the next reasoning block starts with a fresh id rather
than reusing one already closed on the wire. The persisted row is
not removed — it stays in ``session_messages`` as the durable
record of what was reasoned.
Idempotent — returns ``[]`` when no block is open. Drains any
still-buffered delta first so the frontend never loses tail text
from the coalesce window. The id rotation guarantees the next
reasoning block starts with a fresh id rather than reusing one
already closed on the wire. The persisted row is not removed —
it stays in ``session_messages`` as the durable record of what
was reasoned.
"""
if not self._open:
return []
event = StreamReasoningEnd(id=self._block_id)
events: list[StreamBaseResponse] = []
if self._pending_delta:
events.append(
StreamReasoningDelta(id=self._block_id, delta=self._pending_delta)
)
self._pending_delta = ""
events.append(StreamReasoningEnd(id=self._block_id))
self._open = False
self._block_id = str(uuid.uuid4())
self._current_row = None
return [event]
return events

View File

@@ -212,7 +212,12 @@ class TestBaselineReasoningEmitter:
assert emitter.is_open is True
def test_subsequent_deltas_reuse_block_id_without_new_start(self):
emitter = BaselineReasoningEmitter()
# Disable coalescing so each chunk flushes immediately — this test
# is about the Start/Delta/block-id state machine, not the coalesce
# window. Coalescing behaviour is covered below.
emitter = BaselineReasoningEmitter(
coalesce_min_chars=0, coalesce_max_interval_ms=0
)
first = emitter.on_delta(_delta(reasoning="a"))
second = emitter.on_delta(_delta(reasoning="b"))
@@ -267,6 +272,106 @@ class TestBaselineReasoningEmitter:
assert deltas[0].delta == "plan: do the thing"
class TestReasoningDeltaCoalescing:
"""Coalescing batches fine-grained provider chunks into bigger wire
frames. OpenRouter's Kimi K2.6 emits ~4,700 reasoning-delta chunks
per turn vs ~28 for Sonnet; without batching, every chunk becomes one
Redis ``xadd`` + one SSE event + one React re-render of the
non-virtualised chat list, which paint-storms the browser. These
tests pin the batching contract: small chunks buffer until the
char-size or time threshold trips, large chunks still flush
immediately, and ``close()`` never drops tail text."""
def test_small_chunks_after_first_buffer_until_threshold(self):
# Generous time threshold so size alone controls flush timing.
emitter = BaselineReasoningEmitter(
coalesce_min_chars=32, coalesce_max_interval_ms=60_000
)
# First chunk always flushes immediately (so UI renders without
# waiting).
first = emitter.on_delta(_delta(reasoning="hi "))
assert any(isinstance(e, StreamReasoningStart) for e in first)
assert sum(isinstance(e, StreamReasoningDelta) for e in first) == 1
# Subsequent small chunks buffer silently — 5 × 4 chars = 20 chars,
# still under the 32-char threshold.
for _ in range(5):
assert emitter.on_delta(_delta(reasoning="abcd")) == []
# Once the threshold is crossed, the accumulated buffer flushes
# as a single StreamReasoningDelta carrying every buffered chunk.
flush = emitter.on_delta(_delta(reasoning="efghijklmnop"))
assert len(flush) == 1
assert isinstance(flush[0], StreamReasoningDelta)
assert flush[0].delta == "abcd" * 5 + "efghijklmnop"
def test_time_based_flush_when_chars_stay_below_threshold(self, monkeypatch):
# Fake ``time.monotonic`` so we can drive the time-based branch
# deterministically without real sleeps.
from backend.copilot.baseline import reasoning as rmod
fake_now = [0.0]
monkeypatch.setattr(rmod.time, "monotonic", lambda: fake_now[0])
emitter = BaselineReasoningEmitter(
coalesce_min_chars=1000, coalesce_max_interval_ms=40
)
# t=0: first chunk flushes immediately.
first = emitter.on_delta(_delta(reasoning="a"))
assert sum(isinstance(e, StreamReasoningDelta) for e in first) == 1
# t=10 ms: still under 40 ms → buffer.
fake_now[0] = 0.010
assert emitter.on_delta(_delta(reasoning="b")) == []
# t=50 ms since last flush → time threshold trips, flush fires.
fake_now[0] = 0.060
flushed = emitter.on_delta(_delta(reasoning="c"))
assert len(flushed) == 1
assert isinstance(flushed[0], StreamReasoningDelta)
assert flushed[0].delta == "bc"
def test_close_flushes_tail_buffer_before_end(self):
emitter = BaselineReasoningEmitter(
coalesce_min_chars=1000, coalesce_max_interval_ms=60_000
)
emitter.on_delta(_delta(reasoning="first")) # flushes (first chunk)
emitter.on_delta(_delta(reasoning=" middle ")) # buffered
emitter.on_delta(_delta(reasoning="tail")) # buffered
events = emitter.close()
assert len(events) == 2
assert isinstance(events[0], StreamReasoningDelta)
assert events[0].delta == " middle tail"
assert isinstance(events[1], StreamReasoningEnd)
def test_coalesce_disabled_flushes_every_chunk(self):
emitter = BaselineReasoningEmitter(
coalesce_min_chars=0, coalesce_max_interval_ms=0
)
first = emitter.on_delta(_delta(reasoning="a"))
second = emitter.on_delta(_delta(reasoning="b"))
assert sum(isinstance(e, StreamReasoningDelta) for e in first) == 1
assert sum(isinstance(e, StreamReasoningDelta) for e in second) == 1
def test_persistence_stays_per_delta_even_when_wire_coalesces(self):
"""DB row content must track every chunk so a crash mid-turn
persists the full reasoning-so-far, even if the coalesce window
never flushed those chunks to the wire."""
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(
session,
coalesce_min_chars=1000,
coalesce_max_interval_ms=60_000,
)
emitter.on_delta(_delta(reasoning="first "))
emitter.on_delta(_delta(reasoning="chunk "))
emitter.on_delta(_delta(reasoning="three"))
# No close; verify the persisted row already has everything.
assert len(session) == 1
assert session[0].content == "first chunk three"
class TestReasoningPersistence:
"""The persistence contract: without ``role="reasoning"`` rows in
session.messages, useHydrateOnStreamEnd overwrites the live-streamed