Compare commits

...

10 Commits

Author SHA1 Message Date
majdyz
9a2373bf61 test: add E2E screenshots for PR #12870 2026-04-21 20:09:32 +07:00
majdyz
63c4229774 test(backend/copilot): cover reasoning persistence wiring end-to-end
Adds test_reasoning_persists_to_state_session_messages, which drives
reasoning deltas through _baseline_llm_caller and asserts a
role="reasoning" row lands on state.session_messages with the
concatenated delta content.  Catches regressions in
_BaselineStreamState.__post_init__ that silently pass the wrong list
reference to the emitter.
2026-04-21 20:02:55 +07:00
majdyz
c0a27ab878 refactor(backend/copilot): use mock delta in reasoning validation test
- Replace object.__setattr__(__pydantic_extra__) with MagicMock(spec=ChoiceDelta)
  so the test no longer depends on a pydantic-v2 internal attribute name.
- Document the mutate-in-place invariant on _BaselineStreamState.session_messages
  so future edits know the emitter shares the list reference.
2026-04-21 19:59:05 +07:00
majdyz
08b568021b fix(backend/copilot): harden reasoning delta parsing and restore kill switch
- Filter reasoning_details entries by recognised type (reasoning.text /
  reasoning.summary) so future provider metadata cannot leak into the UI
  collapse.
- Swallow + log pydantic ValidationError on malformed OpenRouter
  reasoning payloads instead of aborting the stream; valid text/tool
  events keep flowing.
- Restore the max_thinking_tokens<=0 kill switch on the baseline path so
  operators can silence reasoning without touching the SDK path.
- Drop the duplicate _is_anthropic_route helper; reuse _is_anthropic_model
  from service.py via a lazy import.
- Restore integration coverage for reasoning-only streams and the
  zero-tokens kill switch in service_unit_test.py.
2026-04-21 19:53:14 +07:00
majdyz
316b132a13 fix(backend/copilot): persist baseline reasoning as session rows
pr-test surfaced the headline feature broken: backend emitted a clean reasoning-start/delta/end stream but the frontend Reasoning collapse never rendered.

Root cause: useHydrateOnStreamEnd swaps in the DB-hydrated message list the moment the stream ends, and convertChatSessionToUiMessages.ts only emits {type:'reasoning'} UI parts from ChatMessage(role='reasoning') rows.  SDK persists these rows via acc.reasoning_response in _dispatch_response; baseline didn't, so the live-streamed reasoning parts got overwritten by a reasoning-less hydrate.

Fold persistence into the same BaselineReasoningEmitter that owns the wire events: when a session_messages list is attached, the first reasoning delta appends a ChatMessage(role='reasoning', content=''), every delta mutates .content in lockstep with the StreamReasoningDelta, and close() leaves the row intact.  _BaselineStreamState wires the emitter to its session_messages via __post_init__, so existing callsites don't change.

Mirrors the SDK contract exactly, including across tool-call continuations (each new reasoning block → fresh row). New tests in reasoning_test.py cover the persistence lifecycle (row appended, deltas mutate same row, close keeps row, second block appends new row, no-session works for pure wire emission).
2026-04-21 19:25:02 +07:00
majdyz
db25bbf47d refactor(backend/copilot): extract baseline reasoning into typed module
Address review feedback: the reasoning plumbing was spread across service.py as a mix of inline state, a dict-parsing helper, and a second private close helper, with its own duplicate config field alongside the SDK's thinking-token setting.

* New backend/copilot/baseline/reasoning.py encapsulates the whole concern: ReasoningDetail / OpenRouterDeltaExtension validate the extension fields via pydantic (no getattr / isinstance duck typing), BaselineReasoningEmitter owns the start/delta/end lifecycle, and reasoning_extra_body builds the request fragment.

* _BaselineStreamState drops reasoning_block_id + reasoning_started for a single reasoning_emitter: BaselineReasoningEmitter — three call sites in _baseline_llm_caller collapse to state.reasoning_emitter.on_delta / .close() calls.

* baseline_reasoning_max_tokens deleted; both SDK and baseline now read from the existing claude_agent_max_thinking_tokens, with its docstring updated to describe the shared contract. No reason to have two knobs for the same thing.

* Moved the wire-parser tests to a dedicated backend/copilot/baseline/reasoning_test.py that exercises the pydantic models directly. service_unit_test.py keeps four integration smoke tests that rebuild real ChoiceDelta pydantic chunks (so .model_extra plumbing is exercised end-to-end), and drops the obsolete 'config=0 disables' case.

Net: ~200 fewer lines across service.py + its unit test, behaviour unchanged, reasoning_test.py gives first-class coverage of the parser variants.
2026-04-21 19:07:09 +07:00
majdyz
2517dae85a refactor(backend/copilot): drop unnecessary forward-ref quotes on _BaselineStreamState
Review cycle 3 nit. _BaselineStreamState is defined earlier in the
module (L330) than _close_reasoning_block_if_open (L533), so the
annotation doesn't need to be stringified.
2026-04-21 18:43:14 +07:00
majdyz
080d42b9da fix(backend/copilot): close reasoning/text blocks on exception path
Review cycle 2 follow-up. CodeRabbit flagged that
`_close_reasoning_block_if_open` + thinking-stripper flush + StreamTextEnd
sat in the `_baseline_llm_caller` try block but not its finally, so an
exception mid-stream (network drop, provider 500, cancel) left the
reasoning block unclosed and the frontend collapse never finalised.

- Move close-reasoning + stripper flush + StreamTextEnd emission into the
  outer finally of `_baseline_llm_caller` so they run on both normal and
  exception paths, preserving the
  `...Reasoning/TextEnd -> StreamFinishStep` protocol ordering.
- Remove the now-redundant StreamTextEnd insert-before-StreamFinishStep
  patch in `stream_chat_completion_baseline`'s exception handler — the
  inner finally already closed the text block, so the flag was always
  False by the time the outer handler ran.
- Add `test_reasoning_closed_on_mid_stream_exception` covering the new
  invariant: a stream that yields a reasoning delta then raises must
  still emit StreamReasoningEnd before StreamFinishStep.
2026-04-21 18:39:36 +07:00
majdyz
3d7b381620 refactor(backend/copilot): DRY reasoning-end helper, widen extractor, cover tool_call transition
Review cycle 1 follow-ups.

- Extract `_close_reasoning_block_if_open(state)` helper and replace the
  three inline copies (text branch, tool_calls branch, stream-end) so
  future edits cannot desync the rotation rules.
- Support typed/pydantic entries in `reasoning_details` via attribute
  access fallback — guards against upstream OpenAI-SDK drift that would
  otherwise silently drop every entry.
- Add `test_reasoning_then_tool_call_closes_reasoning_first` covering
  the tool_calls branch (no prior coverage) and
  `test_structured_details_accept_typed_pydantic_entries` covering the
  non-dict fallback.
2026-04-21 18:34:02 +07:00
majdyz
02be5440fc feat(backend/copilot): stream extended_thinking on baseline via OpenRouter
Baseline route's OpenAI-compat call never requested Anthropic extended thinking, so reasoning deltas were invisible even though the frontend's Reasoning collapse was already wired for SDK mode. Fast-mode autopilot never showed a Reasoning block.

Wire the non-OpenAI extension through:

* New 'baseline_reasoning_max_tokens' config (default 8192, 0 disables). Sent as extra_body={'reasoning': {'max_tokens': N}} only on Anthropic routes; other providers ignore the field.

* Extract reasoning from delta via 'reasoning' (legacy string), 'reasoning_content' (DeepSeek), and structured 'reasoning_details'.

* Emit StreamReasoningStart / StreamReasoningDelta / StreamReasoningEnd through the same state machine the SDK adapter uses — reasoning closes on text/tool_use/stream-end so AI SDK v5 keeps the parts distinct.

* Unit tests cover the extractor variants, paired event ordering, reasoning-only streams, and that the reasoning request param is gated by model route and config.
2026-04-21 18:26:45 +07:00
17 changed files with 930 additions and 24 deletions

View File

@@ -0,0 +1,230 @@
"""Extended-thinking wire support for the baseline (OpenRouter) path.
Anthropic routes on OpenRouter expose extended thinking through
non-OpenAI extension fields that the OpenAI Python SDK doesn't model:
* ``reasoning`` (legacy string) — enabled by ``include_reasoning: true``.
* ``reasoning_content`` — DeepSeek / some OpenRouter routes.
* ``reasoning_details`` — structured list shipped with the unified
``reasoning`` request param.
This module keeps the wire-level concerns in one place:
* :class:`OpenRouterDeltaExtension` validates the extension dict pulled off
``ChoiceDelta.model_extra`` into typed pydantic models — no ``getattr`` +
``isinstance`` duck-typing at the call site.
* :class:`BaselineReasoningEmitter` owns the reasoning block lifecycle for
one streaming round and emits ``StreamReasoning*`` events so the caller
only has to plumb the events into its pending queue.
* :func:`reasoning_extra_body` builds the ``extra_body`` fragment for the
OpenAI client call. Returns ``None`` on non-Anthropic routes.
"""
from __future__ import annotations
import logging
import uuid
from typing import Any
from openai.types.chat.chat_completion_chunk import ChoiceDelta
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from backend.copilot.model import ChatMessage
from backend.copilot.response_model import (
StreamBaseResponse,
StreamReasoningDelta,
StreamReasoningEnd,
StreamReasoningStart,
)
logger = logging.getLogger(__name__)
_VISIBLE_REASONING_TYPES = frozenset({"reasoning.text", "reasoning.summary"})
class ReasoningDetail(BaseModel):
"""One entry in OpenRouter's ``reasoning_details`` list.
OpenRouter ships ``type: "reasoning.text"`` / ``"reasoning.summary"`` /
``"reasoning.encrypted"`` entries. Only the first two carry
user-visible text; encrypted entries are opaque and omitted from the
rendered collapse. Unknown future types are tolerated (``extra="ignore"``)
so an upstream addition doesn't crash the stream — but their ``text`` /
``summary`` fields are NOT surfaced because they may carry provider
metadata rather than user-visible reasoning (see
:attr:`visible_text`).
"""
model_config = ConfigDict(extra="ignore")
type: str | None = None
text: str | None = None
summary: str | None = None
@property
def visible_text(self) -> str:
"""Return the human-readable text for this entry, or ``""``.
Only entries with a recognised reasoning type (``reasoning.text`` /
``reasoning.summary``) surface text; unknown or encrypted types
return an empty string even if they carry a ``text`` /
``summary`` field, to guard against future provider metadata
being rendered as reasoning in the UI. Entries missing a
``type`` are treated as text (pre-``reasoning_details`` OpenRouter
payloads omit the field).
"""
if self.type is not None and self.type not in _VISIBLE_REASONING_TYPES:
return ""
return self.text or self.summary or ""
class OpenRouterDeltaExtension(BaseModel):
"""Non-OpenAI fields OpenRouter adds to streaming deltas.
Instantiate via :meth:`from_delta` which pulls the extension dict off
``ChoiceDelta.model_extra`` (where pydantic v2 stashes fields that
aren't part of the declared schema) and validates it through this
model. That keeps the parser honest — malformed entries surface as
validation errors rather than silent ``None``-coalesce bugs — and
avoids the ``getattr`` + ``isinstance`` duck-typing the earlier inline
extractor relied on.
"""
model_config = ConfigDict(extra="ignore")
reasoning: str | None = None
reasoning_content: str | None = None
reasoning_details: list[ReasoningDetail] = Field(default_factory=list)
@classmethod
def from_delta(cls, delta: ChoiceDelta) -> "OpenRouterDeltaExtension":
"""Build an extension view from ``delta.model_extra``.
Malformed provider payloads (e.g. ``reasoning_details`` shipped as
a string rather than a list) surface as a ``ValidationError`` which
is logged and swallowed — returning an empty extension so the rest
of the stream (valid text / tool calls) keeps flowing. An optional
feature's corrupted wire data must never abort the whole stream.
"""
try:
return cls.model_validate(delta.model_extra or {})
except ValidationError as exc:
logger.warning(
"[Baseline] Dropping malformed OpenRouter reasoning payload: %s",
exc,
)
return cls()
def visible_text(self) -> str:
"""Concatenated reasoning text, pulled from whichever channel is set.
Priority: the legacy ``reasoning`` string, then DeepSeek's
``reasoning_content``, then the concatenation of text-bearing
entries in ``reasoning_details``. Only one channel is set per
provider in practice; the priority order just makes the fallback
deterministic if a provider ever emits multiple.
"""
if self.reasoning:
return self.reasoning
if self.reasoning_content:
return self.reasoning_content
return "".join(d.visible_text for d in self.reasoning_details)
def reasoning_extra_body(model: str, max_thinking_tokens: int) -> dict[str, Any] | None:
"""Build the ``extra_body["reasoning"]`` fragment for the OpenAI client.
Returns ``None`` for non-Anthropic routes (other OpenRouter providers
ignore the field but we skip it anyway to keep the payload minimal)
and for ``max_thinking_tokens <= 0`` (operator kill switch).
"""
# Imported lazily to avoid pulling service.py at module load — service.py
# imports this module, and the lazy import keeps the dependency one-way.
from backend.copilot.baseline.service import _is_anthropic_model
if not _is_anthropic_model(model) or max_thinking_tokens <= 0:
return None
return {"reasoning": {"max_tokens": max_thinking_tokens}}
class BaselineReasoningEmitter:
"""Owns the reasoning block lifecycle for one streaming round.
Two concerns live here, both driven by the same state machine:
1. **Wire events.** The AI SDK v6 wire format pairs every
``reasoning-start`` with a matching ``reasoning-end`` and treats
reasoning / text / tool-use as distinct UI parts that must not
interleave.
2. **Session persistence.** ``ChatMessage(role="reasoning")`` rows in
``session.messages`` are what
``convertChatSessionToUiMessages.ts`` folds into the assistant
bubble as ``{type: "reasoning"}`` UI parts on reload and on
``useHydrateOnStreamEnd`` swaps. Without them the live-streamed
reasoning parts get overwritten by the hydrated (reasoning-less)
message list the moment the stream ends. Mirrors the SDK path's
``acc.reasoning_response`` pattern so both routes render the same
way on reload.
Pass ``session_messages`` to enable persistence; omit for pure
wire-emission (tests, scratch callers). On first reasoning delta a
fresh ``ChatMessage(role="reasoning")`` is appended and mutated
in-place as further deltas arrive; :meth:`close` drops the reference
but leaves the appended row intact.
"""
def __init__(
self,
session_messages: list[ChatMessage] | None = None,
) -> None:
self._block_id: str = str(uuid.uuid4())
self._open: bool = False
self._session_messages = session_messages
self._current_row: ChatMessage | None = None
@property
def is_open(self) -> bool:
return self._open
def on_delta(self, delta: ChoiceDelta) -> list[StreamBaseResponse]:
"""Return events for the reasoning text carried by *delta*.
Empty list when the chunk carries no reasoning payload, so this is
safe to call on every chunk without guarding at the call site.
Persistence (when a session message list is attached) happens in
lockstep with emission so the row's content stays equal to the
concatenated deltas at every delta boundary.
"""
ext = OpenRouterDeltaExtension.from_delta(delta)
text = ext.visible_text()
if not text:
return []
events: list[StreamBaseResponse] = []
if not self._open:
events.append(StreamReasoningStart(id=self._block_id))
self._open = True
if self._session_messages is not None:
self._current_row = ChatMessage(role="reasoning", content="")
self._session_messages.append(self._current_row)
events.append(StreamReasoningDelta(id=self._block_id, delta=text))
if self._current_row is not None:
self._current_row.content = (self._current_row.content or "") + text
return events
def close(self) -> list[StreamBaseResponse]:
"""Emit ``StreamReasoningEnd`` for the open block (if any) and rotate.
Idempotent — returns ``[]`` when no block is open. The id rotation
guarantees the next reasoning block starts with a fresh id rather
than reusing one already closed on the wire. The persisted row is
not removed — it stays in ``session_messages`` as the durable
record of what was reasoned.
"""
if not self._open:
return []
event = StreamReasoningEnd(id=self._block_id)
self._open = False
self._block_id = str(uuid.uuid4())
self._current_row = None
return [event]

View File

@@ -0,0 +1,281 @@
"""Tests for the baseline reasoning extension module.
Covers the typed OpenRouter delta parser, the stateful emitter, and the
``extra_body`` builder. The emitter is tested against real
``ChoiceDelta`` pydantic instances so the ``model_extra`` plumbing the
parser relies on is exercised end-to-end.
"""
from openai.types.chat.chat_completion_chunk import ChoiceDelta
from backend.copilot.baseline.reasoning import (
BaselineReasoningEmitter,
OpenRouterDeltaExtension,
ReasoningDetail,
reasoning_extra_body,
)
from backend.copilot.model import ChatMessage
from backend.copilot.response_model import (
StreamReasoningDelta,
StreamReasoningEnd,
StreamReasoningStart,
)
def _delta(**extra) -> ChoiceDelta:
"""Build a ChoiceDelta with the given extension fields on ``model_extra``."""
return ChoiceDelta.model_validate({"role": "assistant", **extra})
class TestReasoningDetail:
def test_visible_text_prefers_text(self):
d = ReasoningDetail(type="reasoning.text", text="hi", summary="ignored")
assert d.visible_text == "hi"
def test_visible_text_falls_back_to_summary(self):
d = ReasoningDetail(type="reasoning.summary", summary="tldr")
assert d.visible_text == "tldr"
def test_visible_text_empty_for_encrypted(self):
d = ReasoningDetail(type="reasoning.encrypted")
assert d.visible_text == ""
def test_unknown_fields_are_ignored(self):
# OpenRouter may add new fields in future payloads — they shouldn't
# cause validation errors.
d = ReasoningDetail.model_validate(
{"type": "reasoning.future", "text": "x", "signature": "opaque"}
)
assert d.text == "x"
def test_visible_text_empty_for_unknown_type(self):
# Unknown types may carry provider metadata that must not render as
# user-visible reasoning — regardless of whether a text/summary is
# present. Only ``reasoning.text`` / ``reasoning.summary`` surface.
d = ReasoningDetail(type="reasoning.future", text="leaked metadata")
assert d.visible_text == ""
def test_visible_text_surfaces_text_when_type_missing(self):
# Pre-``reasoning_details`` OpenRouter payloads omit ``type`` — treat
# them as text so we don't regress the legacy structured shape.
d = ReasoningDetail(text="plain")
assert d.visible_text == "plain"
class TestOpenRouterDeltaExtension:
def test_from_delta_reads_model_extra(self):
delta = _delta(reasoning="step one")
ext = OpenRouterDeltaExtension.from_delta(delta)
assert ext.reasoning == "step one"
def test_visible_text_legacy_string(self):
ext = OpenRouterDeltaExtension(reasoning="plain text")
assert ext.visible_text() == "plain text"
def test_visible_text_deepseek_alias(self):
ext = OpenRouterDeltaExtension(reasoning_content="alt channel")
assert ext.visible_text() == "alt channel"
def test_visible_text_structured_details_concat(self):
ext = OpenRouterDeltaExtension(
reasoning_details=[
ReasoningDetail(type="reasoning.text", text="hello "),
ReasoningDetail(type="reasoning.text", text="world"),
]
)
assert ext.visible_text() == "hello world"
def test_visible_text_skips_encrypted(self):
ext = OpenRouterDeltaExtension(
reasoning_details=[
ReasoningDetail(type="reasoning.encrypted"),
ReasoningDetail(type="reasoning.text", text="visible"),
]
)
assert ext.visible_text() == "visible"
def test_visible_text_empty_when_all_channels_blank(self):
ext = OpenRouterDeltaExtension()
assert ext.visible_text() == ""
def test_empty_delta_produces_empty_extension(self):
ext = OpenRouterDeltaExtension.from_delta(_delta())
assert ext.reasoning is None
assert ext.reasoning_content is None
assert ext.reasoning_details == []
def test_malformed_reasoning_payload_logged_and_swallowed(self, caplog):
# A malformed payload (e.g. reasoning_details shipped as a string
# rather than a list) must not abort the stream — log it and
# return an empty extension so valid text/tool events keep flowing.
# A plain mock is used here because ``from_delta`` only reads
# ``delta.model_extra`` — avoids reaching into pydantic internals
# (``__pydantic_extra__``) that could be renamed across versions.
from unittest.mock import MagicMock
delta = MagicMock(spec=ChoiceDelta)
delta.model_extra = {"reasoning_details": "not a list"}
with caplog.at_level("WARNING"):
ext = OpenRouterDeltaExtension.from_delta(delta)
assert ext.reasoning_details == []
assert ext.visible_text() == ""
assert any("malformed" in r.message.lower() for r in caplog.records)
def test_unknown_typed_entry_with_text_is_not_surfaced(self):
# Regression: the legacy extractor emitted any entry with a
# ``text`` or ``summary`` field. The typed parser now filters on
# the recognised types so future provider metadata can't leak
# into the reasoning collapse.
ext = OpenRouterDeltaExtension(
reasoning_details=[
ReasoningDetail(type="reasoning.future", text="provider metadata"),
ReasoningDetail(type="reasoning.text", text="real"),
]
)
assert ext.visible_text() == "real"
class TestReasoningExtraBody:
def test_anthropic_route_returns_fragment(self):
assert reasoning_extra_body("anthropic/claude-sonnet-4-6", 4096) == {
"reasoning": {"max_tokens": 4096}
}
def test_direct_claude_model_id_still_matches(self):
assert reasoning_extra_body("claude-3-5-sonnet-20241022", 2048) == {
"reasoning": {"max_tokens": 2048}
}
def test_non_anthropic_route_returns_none(self):
assert reasoning_extra_body("openai/gpt-4o", 4096) is None
assert reasoning_extra_body("google/gemini-2.5-pro", 4096) is None
def test_zero_max_tokens_kill_switch(self):
# Operator kill switch: ``max_thinking_tokens <= 0`` disables the
# ``reasoning`` extra_body fragment even on an Anthropic route.
# Lets us silence reasoning without dropping the SDK path's budget.
assert reasoning_extra_body("anthropic/claude-sonnet-4-6", 0) is None
assert reasoning_extra_body("anthropic/claude-sonnet-4-6", -1) is None
class TestBaselineReasoningEmitter:
def test_first_text_delta_emits_start_then_delta(self):
emitter = BaselineReasoningEmitter()
events = emitter.on_delta(_delta(reasoning="thinking"))
assert len(events) == 2
assert isinstance(events[0], StreamReasoningStart)
assert isinstance(events[1], StreamReasoningDelta)
assert events[0].id == events[1].id
assert events[1].delta == "thinking"
assert emitter.is_open is True
def test_subsequent_deltas_reuse_block_id_without_new_start(self):
emitter = BaselineReasoningEmitter()
first = emitter.on_delta(_delta(reasoning="a"))
second = emitter.on_delta(_delta(reasoning="b"))
assert any(isinstance(e, StreamReasoningStart) for e in first)
assert all(not isinstance(e, StreamReasoningStart) for e in second)
assert len(second) == 1
assert isinstance(second[0], StreamReasoningDelta)
assert first[0].id == second[0].id
def test_empty_delta_emits_nothing(self):
emitter = BaselineReasoningEmitter()
assert emitter.on_delta(_delta(content="hello")) == []
assert emitter.is_open is False
def test_close_emits_end_and_rotates_id(self):
emitter = BaselineReasoningEmitter()
# Capture the block id from the wire event rather than reaching
# into emitter internals — the id on the emitted Start/Delta is
# what the frontend actually receives.
start_events = emitter.on_delta(_delta(reasoning="x"))
first_id = start_events[0].id
events = emitter.close()
assert len(events) == 1
assert isinstance(events[0], StreamReasoningEnd)
assert events[0].id == first_id
assert emitter.is_open is False
# Next reasoning uses a fresh id.
new_events = emitter.on_delta(_delta(reasoning="y"))
assert isinstance(new_events[0], StreamReasoningStart)
assert new_events[0].id != first_id
def test_close_is_idempotent(self):
emitter = BaselineReasoningEmitter()
assert emitter.close() == []
emitter.on_delta(_delta(reasoning="x"))
assert len(emitter.close()) == 1
assert emitter.close() == []
def test_structured_details_round_trip(self):
emitter = BaselineReasoningEmitter()
events = emitter.on_delta(
_delta(
reasoning_details=[
{"type": "reasoning.text", "text": "plan: "},
{"type": "reasoning.summary", "summary": "do the thing"},
]
)
)
deltas = [e for e in events if isinstance(e, StreamReasoningDelta)]
assert len(deltas) == 1
assert deltas[0].delta == "plan: do the thing"
class TestReasoningPersistence:
"""The persistence contract: without ``role="reasoning"`` rows in
session.messages, useHydrateOnStreamEnd overwrites the live-streamed
reasoning parts and the Reasoning collapse vanishes. Every delta
must be reflected in the persisted row the moment it's emitted."""
def test_session_row_appended_on_first_delta(self):
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(session)
assert session == []
emitter.on_delta(_delta(reasoning="hi"))
assert len(session) == 1
assert session[0].role == "reasoning"
assert session[0].content == "hi"
def test_subsequent_deltas_mutate_same_row(self):
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(session)
emitter.on_delta(_delta(reasoning="part one "))
emitter.on_delta(_delta(reasoning="part two"))
assert len(session) == 1
assert session[0].content == "part one part two"
def test_close_keeps_row_in_session(self):
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(session)
emitter.on_delta(_delta(reasoning="thought"))
emitter.close()
assert len(session) == 1
assert session[0].content == "thought"
def test_second_reasoning_block_appends_new_row(self):
session: list[ChatMessage] = []
emitter = BaselineReasoningEmitter(session)
emitter.on_delta(_delta(reasoning="first"))
emitter.close()
emitter.on_delta(_delta(reasoning="second"))
assert len(session) == 2
assert [m.content for m in session] == ["first", "second"]
def test_no_session_means_no_persistence(self):
"""Emitter without attached session list emits wire events only."""
emitter = BaselineReasoningEmitter()
events = emitter.on_delta(_delta(reasoning="pure wire"))
assert len(events) == 2 # start + delta, no crash
# Nothing else to assert — just proves None session is supported.

View File

@@ -27,6 +27,10 @@ from openai.types.chat import ChatCompletionMessageParam, ChatCompletionToolPara
from openai.types.completion_usage import PromptTokensDetails
from opentelemetry import trace as otel_trace
from backend.copilot.baseline.reasoning import (
BaselineReasoningEmitter,
reasoning_extra_body,
)
from backend.copilot.config import CopilotLlmModel, CopilotMode
from backend.copilot.context import get_workspace_manager, set_execution_context
from backend.copilot.graphiti.config import is_enabled_for_user
@@ -336,6 +340,7 @@ class _BaselineStreamState:
assistant_text: str = ""
text_block_id: str = field(default_factory=lambda: str(uuid.uuid4()))
text_started: bool = False
reasoning_emitter: BaselineReasoningEmitter = field(init=False)
turn_prompt_tokens: int = 0
turn_completion_tokens: int = 0
turn_cache_read_tokens: int = 0
@@ -346,6 +351,10 @@ class _BaselineStreamState:
# generate one warning per streaming call.
cost_missing_logged: bool = False
thinking_stripper: _ThinkingStripper = field(default_factory=_ThinkingStripper)
# MUTATE in place only — ``__post_init__`` hands this list reference to
# ``BaselineReasoningEmitter`` so reasoning rows can be appended as
# deltas stream in. Reassigning (``state.session_messages = [...]``)
# would silently detach the emitter from the new list.
session_messages: list[ChatMessage] = field(default_factory=list)
# Tracks how much of ``assistant_text`` has already been flushed to
# ``session.messages`` via mid-loop pending drains, so the ``finally``
@@ -360,6 +369,14 @@ class _BaselineStreamState:
# wasn't a system role, so no marking applies).
cached_system_message: dict[str, Any] | None = None
def __post_init__(self) -> None:
# Wire the reasoning emitter to ``session_messages`` so it can
# append ``role="reasoning"`` rows as reasoning streams in — the
# frontend's ``convertChatSessionToUiMessages`` relies on these
# rows to render the Reasoning collapse after the AI SDK's
# stream-end hydrate swaps in the DB-backed message list.
self.reasoning_emitter = BaselineReasoningEmitter(self.session_messages)
def _is_anthropic_model(model: str) -> bool:
"""Return True if *model* routes to Anthropic (native or via OpenRouter).
@@ -536,12 +553,18 @@ async def _baseline_llm_caller(
final_messages = messages
extra_headers = None
typed_messages = cast(list[ChatCompletionMessageParam], final_messages)
extra_body: dict[str, Any] = dict(_OPENROUTER_INCLUDE_USAGE_COST)
reasoning_param = reasoning_extra_body(
state.model, config.claude_agent_max_thinking_tokens
)
if reasoning_param:
extra_body.update(reasoning_param)
create_kwargs: dict[str, Any] = {
"model": state.model,
"messages": typed_messages,
"stream": True,
"stream_options": {"include_usage": True},
"extra_body": _OPENROUTER_INCLUDE_USAGE_COST,
"extra_body": extra_body,
}
if extra_headers:
create_kwargs["extra_headers"] = extra_headers
@@ -591,7 +614,14 @@ async def _baseline_llm_caller(
if not delta:
continue
state.pending_events.extend(state.reasoning_emitter.on_delta(delta))
if delta.content:
# Text and reasoning must not interleave on the wire — the
# AI SDK maps distinct start/end pairs to distinct UI
# parts. Close any open reasoning block before emitting
# the first text delta of this run.
state.pending_events.extend(state.reasoning_emitter.close())
emit = state.thinking_stripper.process(delta.content)
if emit:
if not state.text_started:
@@ -605,6 +635,10 @@ async def _baseline_llm_caller(
)
if delta.tool_calls:
# Same rule as the text branch: close any open reasoning
# block before a tool_use starts so the AI SDK treats
# reasoning and tool-use as distinct parts.
state.pending_events.extend(state.reasoning_emitter.close())
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_calls_by_index:
@@ -629,6 +663,13 @@ async def _baseline_llm_caller(
except Exception:
pass
finally:
# Close open blocks on both normal and exception paths so the
# frontend always sees matched start/end pairs. An exception mid
# ``async for chunk in response`` would otherwise leave reasoning
# and/or text unterminated and only ``StreamFinishStep`` emitted —
# the Reasoning / Text collapses would never finalise.
state.pending_events.extend(state.reasoning_emitter.close())
# Flush any buffered text held back by the thinking stripper.
tail = state.thinking_stripper.flush()
if tail:
@@ -639,12 +680,10 @@ async def _baseline_llm_caller(
state.pending_events.append(
StreamTextDelta(id=state.text_block_id, delta=tail)
)
# Close text block
if state.text_started:
state.pending_events.append(StreamTextEnd(id=state.text_block_id))
state.text_started = False
state.text_block_id = str(uuid.uuid4())
finally:
# Always persist partial text so the session history stays consistent,
# even when the stream is interrupted by an exception.
state.assistant_text += round_text
@@ -1718,25 +1757,14 @@ async def stream_chat_completion_baseline(
_stream_error = True
error_msg = str(e) or type(e).__name__
logger.error("[Baseline] Streaming error: %s", error_msg, exc_info=True)
# Close any open text block. The llm_caller's finally block
# already appended StreamFinishStep to pending_events, so we must
# insert StreamTextEnd *before* StreamFinishStep to preserve the
# protocol ordering:
# StreamStartStep -> StreamTextStart -> ...deltas... ->
# ``_baseline_llm_caller``'s finally block closes any open
# reasoning / text blocks and appends ``StreamFinishStep`` on
# both normal and exception paths, so pending_events already has
# the correct protocol ordering:
# StreamStartStep -> StreamReasoningStart -> ...deltas... ->
# StreamReasoningEnd -> StreamTextStart -> ...deltas... ->
# StreamTextEnd -> StreamFinishStep
# Appending (or yielding directly) would place it after
# StreamFinishStep, violating the protocol.
if state.text_started:
# Find the last StreamFinishStep and insert before it.
insert_pos = len(state.pending_events)
for i in range(len(state.pending_events) - 1, -1, -1):
if isinstance(state.pending_events[i], StreamFinishStep):
insert_pos = i
break
state.pending_events.insert(
insert_pos, StreamTextEnd(id=state.text_block_id)
)
# Drain pending events in correct order
# Just drain what's buffered, then yield the error.
for evt in state.pending_events:
yield evt
state.pending_events.clear()

View File

@@ -23,6 +23,14 @@ from backend.copilot.baseline.service import (
_mark_tools_with_cache_control,
)
from backend.copilot.model import ChatMessage
from backend.copilot.response_model import (
StreamReasoningDelta,
StreamReasoningEnd,
StreamReasoningStart,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
)
from backend.copilot.transcript_builder import TranscriptBuilder
from backend.util.prompt import CompressResult
from backend.util.tool_call_loop import LLMLoopResponse, LLMToolCall, ToolCallResult
@@ -1508,3 +1516,360 @@ class TestApplyPromptCacheMarkers:
# The exact same list object reaches the provider (no copy needed).
call_messages = mock_client.chat.completions.create.call_args[1]["messages"]
assert call_messages is messages
def _make_delta_chunk(
*,
content: str | None = None,
reasoning: str | None = None,
reasoning_details: list | None = None,
reasoning_content: str | None = None,
tool_calls: list | None = None,
):
"""Build a streaming chunk with a configurable ``delta`` payload.
The ``delta`` is a real ``ChoiceDelta`` pydantic instance so OpenRouter
extension fields land on ``delta.model_extra`` — which is how
:class:`OpenRouterDeltaExtension` reads them in production. Using a
raw ``MagicMock`` here would leave ``model_extra`` unset and silently
skip the reasoning parser. ``tool_calls`` (when provided) must be
``MagicMock`` entries compatible with the service's streaming loop;
they're set on the delta via ``object.__setattr__`` because pydantic
would otherwise reject the non-schema types.
"""
from openai.types.chat.chat_completion_chunk import ChoiceDelta
payload: dict = {"role": "assistant"}
if content is not None:
payload["content"] = content
if reasoning is not None:
payload["reasoning"] = reasoning
if reasoning_content is not None:
payload["reasoning_content"] = reasoning_content
if reasoning_details is not None:
payload["reasoning_details"] = reasoning_details
delta = ChoiceDelta.model_validate(payload)
# ChoiceDelta's tool_calls schema expects OpenAI-typed entries; bypass
# validation so tests can use MagicMocks that mimic the streaming shape.
if tool_calls is not None:
object.__setattr__(delta, "tool_calls", tool_calls)
chunk = MagicMock()
chunk.usage = None
choice = MagicMock()
choice.delta = delta
chunk.choices = [choice]
return chunk
def _make_tool_call_delta(*, index: int, call_id: str, name: str, arguments: str):
"""Build a ``delta.tool_calls[i]`` entry for streaming tool-use."""
tc = MagicMock()
tc.index = index
tc.id = call_id
function = MagicMock()
function.name = name
function.arguments = arguments
tc.function = function
return tc
class TestBaselineReasoningStreaming:
"""End-to-end reasoning event emission through ``_baseline_llm_caller``."""
@pytest.mark.asyncio
async def test_reasoning_then_text_emits_paired_events(self):
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
chunks = [
_make_delta_chunk(reasoning="thinking..."),
_make_delta_chunk(reasoning=" more"),
_make_delta_chunk(content="final answer"),
]
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock(*chunks)
)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
types = [type(e).__name__ for e in state.pending_events]
assert "StreamReasoningStart" in types
assert "StreamReasoningDelta" in types
assert "StreamReasoningEnd" in types
# Reasoning must close before text opens — AI SDK v5 rejects
# interleaved reasoning / text parts.
reason_end = types.index("StreamReasoningEnd")
text_start = types.index("StreamTextStart")
assert reason_end < text_start
# All reasoning deltas share a single block id; the text block uses
# a fresh id after the reasoning-end rotation.
reasoning_ids = {
e.id
for e in state.pending_events
if isinstance(
e, (StreamReasoningStart, StreamReasoningDelta, StreamReasoningEnd)
)
}
text_ids = {
e.id
for e in state.pending_events
if isinstance(e, (StreamTextStart, StreamTextDelta, StreamTextEnd))
}
assert len(reasoning_ids) == 1
assert len(text_ids) == 1
assert reasoning_ids.isdisjoint(text_ids)
combined = "".join(
e.delta for e in state.pending_events if isinstance(e, StreamReasoningDelta)
)
assert combined == "thinking... more"
@pytest.mark.asyncio
async def test_reasoning_then_tool_call_closes_reasoning_first(self):
"""A tool_call arriving mid-reasoning must close the reasoning block
before the tool-use is flushed — AI SDK v5 treats reasoning and
tool-use as distinct UI parts and rejects interleaving."""
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
chunks = [
_make_delta_chunk(reasoning="deliberating..."),
_make_delta_chunk(
tool_calls=[
_make_tool_call_delta(
index=0,
call_id="call_1",
name="search",
arguments='{"q":"x"}',
)
],
),
]
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock(*chunks)
)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
response = await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
# A reasoning-end must have been emitted — this is the tool_calls
# branch's responsibility, not the stream-end cleanup.
types = [type(e).__name__ for e in state.pending_events]
assert "StreamReasoningStart" in types
assert "StreamReasoningEnd" in types
# The tool_call was collected — confirms the tool-use path executed
# after reasoning closed (rather than silently dropping the tool).
assert len(response.tool_calls) == 1
assert response.tool_calls[0].name == "search"
# No text events — this stream had no content deltas.
assert "StreamTextStart" not in types
@pytest.mark.asyncio
async def test_reasoning_closed_on_mid_stream_exception(self):
"""Regression guard: an exception during the streaming loop must
still emit ``StreamReasoningEnd`` (and ``StreamTextEnd`` when a
text block is open) before ``StreamFinishStep`` — the frontend
collapse relies on matched start/end pairs, and the outer handler
no longer patches these after-the-fact."""
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
async def failing_stream():
yield _make_delta_chunk(reasoning="thinking...")
raise RuntimeError("boom")
stream = MagicMock()
stream.close = AsyncMock()
stream.__aiter__ = lambda self: failing_stream()
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(return_value=stream)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
with pytest.raises(RuntimeError):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
types = [type(e).__name__ for e in state.pending_events]
# The reasoning block was opened, the exception fired, and the
# finally block must have closed it before emitting the finish
# step.
assert "StreamReasoningStart" in types
assert "StreamReasoningEnd" in types
assert "StreamFinishStep" in types
assert types.index("StreamReasoningEnd") < types.index("StreamFinishStep")
# Emitter is reset so a retried round starts with fresh ids.
assert state.reasoning_emitter.is_open is False
@pytest.mark.asyncio
async def test_reasoning_param_sent_on_anthropic_routes(self):
"""Anthropic route gets ``reasoning.max_tokens`` on the request."""
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock()
)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
extra_body = mock_client.chat.completions.create.call_args[1]["extra_body"]
assert "reasoning" in extra_body
assert extra_body["reasoning"]["max_tokens"] > 0
@pytest.mark.asyncio
async def test_reasoning_param_absent_on_non_anthropic_routes(self):
"""Non-Anthropic routes (e.g. OpenAI) must not receive ``reasoning``."""
state = _BaselineStreamState(model="openai/gpt-4o")
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock()
)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
extra_body = mock_client.chat.completions.create.call_args[1]["extra_body"]
assert "reasoning" not in extra_body
@pytest.mark.asyncio
async def test_reasoning_only_stream_still_closes_block(self):
"""Regression: a stream with only reasoning (no text, no tool_call)
must still emit a matching ``reasoning-end`` at stream close so the
frontend Reasoning collapse finalises. Exercised here against
``_baseline_llm_caller`` to cover the emitter's integration with
the finally-block, not just the unit emitter in reasoning_test.py.
"""
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock(
_make_delta_chunk(reasoning="just thinking"),
)
)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
types = [type(e).__name__ for e in state.pending_events]
assert "StreamReasoningStart" in types
assert "StreamReasoningEnd" in types
# No text was produced — no text events should be emitted.
assert "StreamTextStart" not in types
assert "StreamTextDelta" not in types
@pytest.mark.asyncio
async def test_reasoning_param_suppressed_when_thinking_tokens_zero(self):
"""Operator kill switch: setting ``claude_agent_max_thinking_tokens``
to 0 removes the ``reasoning`` fragment from ``extra_body`` even on
an Anthropic route. Restores the zero-disables behaviour the old
``baseline_reasoning_max_tokens`` config used to provide."""
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock()
)
with (
patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
),
patch(
"backend.copilot.baseline.service.config.claude_agent_max_thinking_tokens",
0,
),
):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
extra_body = mock_client.chat.completions.create.call_args[1]["extra_body"]
assert "reasoning" not in extra_body
@pytest.mark.asyncio
async def test_reasoning_persists_to_state_session_messages(self):
"""Integration guard: ``_BaselineStreamState.__post_init__`` wires
the emitter to ``state.session_messages``, so reasoning deltas
flowing through ``_baseline_llm_caller`` must produce a
``role="reasoning"`` row on the state's session list. Catches
regressions where the wiring silently breaks (e.g. a refactor
passes the wrong list reference)."""
state = _BaselineStreamState(model="anthropic/claude-sonnet-4-6")
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=_make_stream_mock(
_make_delta_chunk(reasoning="first "),
_make_delta_chunk(reasoning="thought"),
_make_delta_chunk(content="answer"),
)
)
with patch(
"backend.copilot.baseline.service._get_openai_client",
return_value=mock_client,
):
await _baseline_llm_caller(
messages=[{"role": "user", "content": "hi"}],
tools=[],
state=state,
)
reasoning_rows = [m for m in state.session_messages if m.role == "reasoning"]
assert len(reasoning_rows) == 1
assert reasoning_rows[0].content == "first thought"

View File

@@ -194,9 +194,11 @@ class ChatConfig(BaseSettings):
default=8192,
ge=1024,
le=128000,
description="Maximum thinking/reasoning tokens per LLM call. "
"Extended thinking on Opus can generate 50k+ tokens at $75/M — "
"capping this is the single biggest cost lever. "
description="Maximum thinking/reasoning tokens per LLM call. Applies "
"to both the Claude Agent SDK path (as ``max_thinking_tokens``) and "
"the baseline OpenRouter path (as ``extra_body.reasoning.max_tokens`` "
"on Anthropic routes). Extended thinking on Opus can generate 50k+ "
"tokens at $75/M — capping this is the single biggest cost lever. "
"8192 is sufficient for most tasks; increase for complex reasoning.",
)
claude_agent_thinking_effort: Literal["low", "medium", "high", "max"] | None = (

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 147 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 147 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 77 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 89 KiB