Compare commits

..

10 Commits

Author SHA1 Message Date
Zamil Majdy
301782a94a test: add E2E screenshots for PR #12882 2026-04-22 20:07:33 +07:00
majdyz
4d3b79cdc4 fix(backend/copilot): strip leading whitespace from Kimi-emitted tool names
Kimi K2.6 occasionally emits tool names with a leading space
(e.g. ` mcp__copilot__find_block`), which the CLI dispatcher
can't resolve against its registered MCP tools and responds to
with a synthetic 'No such tool available' error.  The underlying
call still fails upstream in the CLI, but stripping whitespace
before removing the MCP prefix keeps the frontend, logs, and the
next-turn tool_result name consistent with successful calls.
2026-04-22 19:52:02 +07:00
Zamil Majdy
4dd165a5a5 style(backend/copilot): fix isort on moonshot_test.py (CI lint) 2026-04-22 19:45:54 +07:00
Zamil Majdy
2072fa6027 fix(backend/copilot): address sentry title-cost sentry findings
1. IndexError-safety on empty ``choices`` — the existing guard
   ``if response.choices else None`` handled the empty-list case but
   sentry flagged the access pattern anyway.  Make it fully defensive:
   check ``response.choices`` non-empty AND ``choices[0].message`` not
   None before reading ``.content``.  Background task never crashes on
   a malformed response.

2. Dropped the session load in ``_record_title_generation_cost``.
   Sentry flagged that ``persist_and_record_usage`` was appending to
   the loaded session's in-memory ``usage`` list with no
   ``upsert_chat_session`` writeback — pure dead work.  The background
   task isn't running inside a request-scoped session; the turn's main
   ``persist_and_record_usage`` already owns the mirror for the
   originating turn.  Passing ``session=None`` skips the append but
   still records the cost into ``PlatformCostLog`` (admin dashboard)
   and the microdollar rate-limit counter — the two places that
   actually matter for a title call.
2026-04-22 19:40:26 +07:00
Zamil Majdy
2db9d6abf7 fix(backend/copilot): use observed model for Moonshot cost override (PR #12882)
Address coderabbit review: the cost-override decision was gating on
``state.options.model`` (the primary requested model), but when the
SDK's fallback path swaps in a different model, ``AssistantMessage.model``
reflects the actual executed one.  Without this, a Moonshot primary
that falls back to Anthropic would be mis-billed at Moonshot rates
(and vice versa) until the OR reconcile arrived and corrected it.

Adds ``observed_model`` to ``_RetryState``, populates it from each
``AssistantMessage.model`` we see, and prefers it over
``state.options.model`` at the cost-override site.  Falls back to
``state.options.model`` when no AssistantMessage has been observed yet
(e.g. turn exited before first response) so existing behaviour is
preserved.

Also fix isort ordering in baseline/service.py — the moonshot import
was sorted before ``model`` but needs to go after (alphabetical
``model`` < ``moonshot``).
2026-04-22 19:35:32 +07:00
Zamil Majdy
59d253303d docs(backend/copilot): simplify sdk_include_partial_messages description 2026-04-22 19:27:50 +07:00
Zamil Majdy
3873f985a2 feat(backend/copilot): enable SDK partial-message streaming by default
Flip ``CHAT_SDK_INCLUDE_PARTIAL_MESSAGES`` default from False → True so
extended-thinking turns on the SDK path stream token-by-token instead
of popping in as a lump at ``content_block_stop``.  Matches the UX
the baseline path has had since #12873.

The partial/summary diff-based reconcile in the adapter has been stable
through internal soak — partial deltas + summary tail emit without
double-writing or truncation across text, thinking, and tool_use
block types.

Kill-switch: set ``CHAT_SDK_INCLUDE_PARTIAL_MESSAGES=false`` to fall
back to summary-only emission if an adapter regression surfaces.
2026-04-22 19:26:41 +07:00
Zamil Majdy
f1a615ad9d refactor(backend/copilot): scope Moonshot override to Moonshot call sites + remove noqa
PR #12882 review feedback (self-review + coderabbit):

1. Call site clarity — the SDK path was calling ``_override_cost_for_moonshot``
   unconditionally on every turn, relying on the function's internal
   "non-Moonshot → passthrough" branch to no-op for Anthropic.  Explicit
   gate with ``_is_moonshot_model`` at the call site makes the intent
   obvious (we only recompute for Moonshot; Anthropic trusts the CLI).

2. Drop ``# noqa: BLE001`` — the repo rule is no linter suppressors.
   Swallowing broad exceptions in ``_record_title_generation_cost`` with
   a silent ``logger.debug`` was useless: rare cases stayed invisible;
   real DB failures never surfaced.

   Refactor the title / cost tracking flow so they're independent
   best-effort steps:
     * ``_generate_session_title`` returns ``(title, response)`` and
       only catches the LLM call failure (which loses both — no way
       around that).
     * ``_update_title_async`` persists the title AND records the cost
       in separate ``try`` blocks with meaningful warning logs — a
       flaky Prisma call on cost recording never costs us the title,
       and a title-persist failure still records the cost.
     * ``_title_usage_from_response`` replaces the narrow float-cast
       ``except`` with an ``isinstance(cost_raw, (int, float))`` check
       — no exception suppression anywhere in the cost path.
     * Early-return from ``_record_title_generation_cost`` when there's
       nothing to record (docstring previously said "lazy load" but the
       code paid the DB roundtrip unconditionally; now it actually is
       lazy).
2026-04-22 19:19:10 +07:00
Zamil Majdy
fe822a0d20 refactor(backend/copilot): address PR #12882 review comments
Renamed moonshot.supports_cache_control → moonshot_supports_cache_control
so the narrow Moonshot-only scope is obvious at call sites. Previous
name read like a universal predicate that answers correctly for any
model, but it returns False for Anthropic (which obviously supports
cache_control too).  Callers combine it with _is_anthropic_model via
the baseline's _supports_prompt_cache_markers wrapper.

Widened rate_card_usd param annotation to str | None to match actual
behaviour — is_moonshot_model(None) returns False so callers could
already pass None safely; the old str annotation was misleading.

Title cost recording now:
  * Skips the session DB lookup when there's nothing meaningful to
    record (no cost AND no tokens) — docstring said the load was
    lazy, now the code actually is.
  * Derives `provider` from ChatConfig.base_url instead of hardcoding
    "open_router".  Keeps the label correct for deployments that point
    title generation at a direct OpenAI endpoint.
2026-04-22 19:09:10 +07:00
Zamil Majdy
dbc3b0b31b refactor(backend/copilot): isolate Moonshot quirks + enable Moonshot cache_control + track title cost
## Why

Three loose ends from the Kimi SDK-default merge (#12878):

1. Kimi-specific pricing logic (rate card, cost-override helper) lived
   inline in sdk/service.py next to unrelated SDK plumbing.  Any future
   non-Anthropic vendor would have piled into the same file.
2. Moonshot's Anthropic-compat endpoint honours ``cache_control:
   {type: ephemeral}`` markers, but the baseline cache-marking gate
   (``_is_anthropic_model``) was narrow enough to exclude it — so
   Moonshot fell back to its automatic prefix cache, which drifts
   readily between turns (internal testing: 0/4 hits across two
   continuation sessions).
3. Title generation (``_update_title_async``, runs per-session) makes
   its own LLM call that the turn reconcile never sees — admin cost
   dashboard under-reports provider spend by the aggregate of those
   calls.

## What

- New ``backend/copilot/moonshot.py`` module:
    * ``is_moonshot_model(model)`` — prefix check against ``moonshotai/``
    * ``rate_card_usd(model)`` — published Moonshot rates, default
      ``(0.60, 2.80)`` per MTok, per-slug overrides slot for future SKUs
    * ``override_cost_usd(...)`` — moved from sdk/service.py, replaces
      the CLI's Sonnet-rate Kimi estimate with the real rate card
    * ``supports_cache_control(model)`` — gate for Anthropic-style
      cache markers on Moonshot routes
    * Explicit docstring: rate card is NOT canonical — authoritative
      cost comes from the OpenRouter ``/generation`` reconcile; this
      module only improves the in-turn estimate and the reconcile's
      lookup-fail fallback.
- Baseline ``_is_anthropic_model`` stays narrow (still only Anthropic —
  callers needing the ``anthropic-beta`` header use it as-is).  New
  ``_supports_prompt_cache_markers`` widens the gate to Anthropic OR
  Moonshot; both call sites that emit ``cache_control`` on the system
  message and the final tool schema switch to this wider gate.  OpenAI
  / Grok / Gemini still 400 on the field, so those providers keep the
  ``false`` answer.
- Title generation now captures its own cost via
  ``persist_and_record_usage`` with ``provider="open_router"`` and
  ``block_name="copilot:title"``, so the admin dashboard sees every
  dollar we spend.  OR's ``usage.cost`` is read off ``usage.model_extra``
  (the pydantic container) using the same pattern as tools/web_search.
- TODO marker on the rate-card call site: after ~1 week of production
  data confirming OR ``/generation`` reconcile reliability, drop the
  rate card entirely and rely on the authoritative number for all
  Kimi turns.

## How

- Detection is prefix-based (``moonshotai/``) — every Kimi SKU today
  uses that namespace and shares pricing, so a future
  ``moonshotai/kimi-k3.0`` inherits both the rate card and the
  cache-control gate transparently without editing this file.
- ``_mark_tools_with_cache_control`` and the per-round cached system
  message builder already exist; the only change is swapping the gate
  from ``_is_anthropic_model`` to ``_supports_prompt_cache_markers``.
- Title cost capture is in-line with the existing ``_generate_session_title``
  call (``extra_body={"usage":{"include":True}}`` asks OR to embed the
  real billed cost); a best-effort ``_record_title_generation_cost``
  helper reads it off ``usage.model_extra`` and fires
  ``persist_and_record_usage`` under ``try/except`` — any cost-tracking
  failure downgrades to debug log so title generation itself never
  breaks.

## Deferred to follow-up

- Kimi reasoning renders after text on dev because Moonshot's shim
  splits each turn into separate ``AssistantMessage`` summaries (one
  text-only, one thinking-only) — the in-message hoist at
  ``response_adapter.py:193`` can't reorder across messages.  Fix
  needs more design (UX trade-off between realtime streaming and
  correct ordering); investigating separately.
2026-04-22 19:02:58 +07:00
32 changed files with 227 additions and 2230 deletions

View File

@@ -1,245 +0,0 @@
---
name: pr-polish
description: Alternate /pr-review and /pr-address on a PR until the PR is truly mergeable — no new review findings, zero unresolved inline threads, zero unaddressed top-level reviews or issue comments, all CI checks green, and two consecutive quiet polls after CI settles. Use when the user wants a PR polished to merge-ready without setting a fixed number of rounds.
user-invocable: true
argument-hint: "[PR number or URL] — if omitted, finds PR for current branch."
metadata:
author: autogpt-team
version: "1.0.0"
---
# PR Polish
**Goal.** Drive a PR to merge-ready by alternating `/pr-review` and `/pr-address` until **all** of the following hold:
1. The most recent `/pr-review` produces **zero new findings** (no new inline comments, no new top-level reviews with a non-empty body).
2. Every inline review thread reachable via GraphQL reports `isResolved: true`.
3. Every non-bot, non-author top-level review has been acknowledged (replied-to) OR resolved via a thread it spawned.
4. Every non-bot, non-author issue comment has been acknowledged (replied-to).
5. Every CI check is `conclusion: "success"` or `"skipped"` / `"neutral"` — none `"failure"` or still pending.
6. **Two consecutive post-CI polls** (≥60s apart) stay clean — no new threads, no new non-empty reviews, no new issue comments. Bots (coderabbitai, sentry, autogpt-reviewer) frequently post late after CI settles; a single green snapshot is not sufficient.
**Do not stop at a fixed number of rounds.** If round N introduces new comments, round N+1 is required. Cap at `_MAX_ROUNDS = 10` as a safety valve, but expect 25 in practice.
## TodoWrite
Before starting, write two todos so the user can see the loop progression:
- `Round {current}: /pr-review + /pr-address on PR #{N}` — current iteration.
- `Final polish polling: 2 consecutive clean polls, CI green, 0 unresolved` — runs after the last non-empty review round.
Update the `current` round counter at the start of each iteration; mark `completed` only when the round's address step finishes (all new threads addressed + resolved).
## Find the PR
```bash
ARG_PR="${ARG:-}"
# Normalize URL → numeric ID if the skill arg is a pull-request URL.
if [[ "$ARG_PR" =~ ^https?://github\.com/[^/]+/[^/]+/pull/([0-9]+) ]]; then
ARG_PR="${BASH_REMATCH[1]}"
fi
PR="${ARG_PR:-$(gh pr list --head "$(git branch --show-current)" --repo Significant-Gravitas/AutoGPT --json number --jq '.[0].number')}"
if [ -z "$PR" ] || [ "$PR" = "null" ]; then
echo "No PR found for current branch. Provide a PR number or URL as the skill arg."
exit 1
fi
echo "Polishing PR #$PR"
```
## The outer loop
```text
round = 0
while round < _MAX_ROUNDS:
round += 1
baseline = snapshot_state(PR) # see "Snapshotting state" below
invoke_skill("pr-review", PR) # posts findings as inline comments / top-level review
findings = diff_state(PR, baseline)
if findings.total == 0:
break # no new findings → go to polish polling
invoke_skill("pr-address", PR) # resolves every unresolved thread + CI failure
# Post-loop: polish polling (see below).
polish_polling(PR)
```
### Snapshotting state
Before each `/pr-review`, capture a baseline so the diff after the review reflects **only** what the review just added (not pre-existing threads):
```bash
# Inline threads — total count + latest databaseId per thread
gh api graphql -f query="
{
repository(owner: \"Significant-Gravitas\", name: \"AutoGPT\") {
pullRequest(number: ${PR}) {
reviewThreads(first: 100) {
totalCount
nodes {
id
isResolved
comments(last: 1) { nodes { databaseId } }
}
}
}
}
}" > /tmp/baseline_threads.json
# Top-level reviews — count + latest id per non-empty review
gh api "repos/Significant-Gravitas/AutoGPT/pulls/${PR}/reviews" --paginate \
--jq '[.[] | select((.body // "") != "") | {id, user: .user.login, state, submitted_at}]' \
> /tmp/baseline_reviews.json
# Issue comments — count + latest id per non-bot, non-author comment.
# Bots are filtered by User.type == "Bot" (GitHub sets this for app/bot
# accounts like coderabbitai, github-actions, sentry-io). The author is
# filtered by comparing login to the PR author — export it so jq can see it.
AUTHOR=$(gh api "repos/Significant-Gravitas/AutoGPT/pulls/${PR}" --jq '.user.login')
gh api "repos/Significant-Gravitas/AutoGPT/issues/${PR}/comments" --paginate \
--jq --arg author "$AUTHOR" \
'[.[] | select(.user.type != "Bot" and .user.login != $author)
| {id, user: .user.login, created_at}]' \
> /tmp/baseline_issue_comments.json
```
### Diffing after a review
After `/pr-review` runs, any of these counting as "new findings" means another address round is needed:
- New inline thread `id` not in the baseline.
- An existing thread whose latest comment `databaseId` is higher than the baseline's (new reply on an old thread).
- A new top-level review `id` with a non-empty body.
- A new issue comment `id` from a non-bot, non-author user.
If any of the four buckets is non-empty → not done; invoke `/pr-address` and loop.
## Polish polling
Once `/pr-review` produces zero new findings, do **not** exit yet. Bots (coderabbitai, sentry, autogpt-reviewer) commonly post late reviews after CI settles — 3090 seconds after the final push. Poll at 60-second intervals:
```text
NON_SUCCESS_TERMINAL = {"failure", "cancelled", "timed_out", "action_required", "startup_failure"}
clean_polls = 0
required_clean = 2
while clean_polls < required_clean:
# 1. CI gate — any terminal non-success conclusion (not just "failure")
# must trigger /pr-address. "success", "skipped", "neutral" are clean;
# anything else (including cancelled, timed_out, action_required) is a
# blocker that won't self-resolve.
ci = fetch_check_runs(PR)
if any ci.conclusion in NON_SUCCESS_TERMINAL:
invoke_skill("pr-address", PR) # address failures + any new comments
baseline = snapshot_state(PR) # reset — push during address invalidates old baseline
clean_polls = 0
continue
if any ci.conclusion is None (still in_progress):
sleep 60; continue # wait without counting this as clean
# 2. Comment / thread gate
threads = fetch_unresolved_threads(PR)
new_issue_comments = diff_against_baseline(issue_comments)
new_reviews = diff_against_baseline(reviews)
if threads or new_issue_comments or new_reviews:
invoke_skill("pr-address", PR)
baseline = snapshot_state(PR) # reset — the address loop just dealt with these,
# otherwise they stay "new" relative to the old baseline forever
clean_polls = 0
continue
# 3. Mergeability gate
mergeable = gh api repos/.../pulls/${PR} --jq '.mergeable'
if mergeable == false (CONFLICTING):
resolve_conflicts(PR) # see pr-address skill
clean_polls = 0
continue
if mergeable is null (UNKNOWN):
sleep 60; continue
clean_polls += 1
sleep 60
```
Only after `clean_polls == 2` do you report `ORCHESTRATOR:DONE`.
### Why 2 clean polls, not 1
A single green snapshot can be misleading — the final CI check often completes ~30s before a bot posts its delayed review. One quiet cycle does not prove the PR is stable; two consecutive cycles with no new threads, reviews, or issue comments arriving gives high confidence nothing else is incoming.
### Why checking every source each poll
`/pr-address` polling inside a single round already re-checks its own comments, but `/pr-polish` sits a level above and must also catch:
- New top-level reviews (autogpt-reviewer sometimes posts structured feedback only after several CI green cycles).
- Issue comments from human reviewers (not caught by inline thread polling).
- Sentry bug predictions that land on new line numbers post-push.
- Merge conflicts introduced by a race between your push and a merge to `dev`.
## Invocation pattern
Delegate to existing skills with the `Skill` tool; do not re-implement the review or address logic inline. This keeps the polish loop focused on orchestration and lets the child skills evolve independently.
```python
Skill(skill="pr-review", args=pr_url)
Skill(skill="pr-address", args=pr_url)
```
After each child invocation, re-query GitHub state directly — never trust a summary for the stop condition. The orchestrator's `ORCHESTRATOR:DONE` is verified against actual GraphQL / REST responses per the rules in `pr-address`'s "Verify actual count before outputting ORCHESTRATOR:DONE" section.
### **Auto-continue: do NOT end your response between child skills**
`/pr-polish` is a single orchestration task — one invocation drives the PR all the way to merge-ready. When a child `Skill()` call returns control to you:
- Do NOT summarize and stop.
- Do NOT wait for user confirmation to continue.
- Immediately, in the same response, perform the next loop step: state diff → decide next action → next `Skill()` call or polling sleep.
The child skill returning is a **loop iteration boundary**, not a conversation turn boundary. You are expected to keep going until one of the exit conditions in the opening section is met (2 consecutive clean polls, `_MAX_ROUNDS` hit, or an unrecoverable error).
If the user needs to approve a risky action mid-loop (e.g., a force-push or a destructive git operation), pause there — but not at the routine "round N finished, round N+1 needed" boundary. Those are silent transitions.
## GitHub rate limits
This skill issues many GraphQL calls (one review-thread query per outer iteration plus per-poll queries inside polish polling). Expect the GraphQL budget to be tight on large PRs. When `gh api rate_limit --jq .resources.graphql.remaining` drops below ~200, back off:
- Fall back to REST for reads (flat `/pulls/{N}/comments`, `/pulls/{N}/reviews`, `/issues/{N}/comments`) per the `pr-address` skill's GraphQL-fallback section.
- Queue thread resolutions (GraphQL-only) until the budget resets; keep making progress on fixes + REST replies meanwhile.
- `sleep 5` between any batch of ≥20 writes to avoid secondary rate limits.
## Safety valves
- `_MAX_ROUNDS = 10` — if review+address rounds exceed this, stop and escalate to the user with a summary of what's still unresolved. A PR that cannot converge in 10 rounds has systemic issues that need human judgment.
- After each commit, run `poetry run format` / `pnpm format && pnpm lint && pnpm types` per the target codebase's conventions. A failing format check is CI `failure` that will never self-resolve.
- Every `/pr-review` round checks for **duplicate** concerns first (via `pr-review`'s own "Fetch existing review comments" step) so the loop does not re-post the same finding that a prior round already resolved.
## Reporting
When the skill finishes (either via two clean polls or hitting `_MAX_ROUNDS`), produce a compact summary:
```
PR #{N} polish complete ({rounds_completed} rounds):
- {X} inline threads opened and resolved
- {Y} CI failures fixed
- {Z} new commits pushed
Final state: CI green, {total} threads all resolved, mergeable.
```
If exiting via `_MAX_ROUNDS`, flag explicitly:
```
PR #{N} polish stopped at {_MAX_ROUNDS} rounds — NOT merge-ready:
- {N} threads still unresolved: {titles}
- CI status: {summary}
Needs human review.
```
## When to use this skill
Use when the user says any of:
- "polish this PR"
- "keep reviewing and addressing until it's mergeable"
- "loop /pr-review + /pr-address until done"
- "make sure the PR is actually merge-ready"
Do **not** use when:
- User wants just one review pass (→ `/pr-review`).
- User wants to address already-posted comments without further self-review (→ `/pr-address`).
- A fixed round count is explicitly requested (e.g., "do 3 rounds") — honour the count instead of converging.

View File

@@ -260,32 +260,6 @@ Use a `trap` so release runs even on `exit 1`:
trap 'kill "$HEARTBEAT_PID" 2>/dev/null; rm -f "$LOCK"' EXIT INT TERM
```
### **Release the lock AS SOON AS the test run is done**
The lock guards **test execution**, not **app lifecycle**. Once Step 5 (record results) and Step 6 (post PR comment) are complete, release the lock IMMEDIATELY — even if:
- The native `poetry run app` / `pnpm dev` processes are still running so the user can keep poking at the app manually.
- You're leaving docker containers up.
- You're tailing logs for a minute or two.
Keeping the lock held past the test run is the single most common way `/pr-test` stalls other agents. **The app staying up is orthogonal to the lock; don't conflate them.** Sibling worktrees running their own `/pr-test` will kill the stray processes and free the ports themselves (Step 3c/3e-native handle that) — they just need the lock file gone.
Concretely, the sequence at the end of every `/pr-test` run (success or failure) is:
```bash
# 1. Write the final report + post PR comment — done above in Step 5/6.
# 2. Release the lock right now, even if the app is still up.
kill "$HEARTBEAT_PID" 2>/dev/null
rm -f "$LOCK" /tmp/pr-test-heartbeat.pid
echo "$(date -u +%Y-%m-%dT%H:%MZ) [pr-${PR_NUMBER}] released lock (app may still be running)" \
>> /Users/majdyz/Code/AutoGPT/.ign.testing.log
# 3. Optionally leave the app running and note it so the user knows:
echo "Native stack still running on :3000 / :8006 for manual poking. Kill with:"
echo " pkill -9 -f 'poetry run app'; pkill -9 -f 'next-server|next dev'"
```
If a sibling agent's `/pr-test` needs to take over, it'll do the kill+rebuild dance from Step 3c/3e-native on its own — your only job is to not hold the lock file past the end of your test.
### Shared status log
`/Users/majdyz/Code/AutoGPT/.ign.testing.log` is an append-only channel any agent can read/write. Use it for "I'm waiting", "I'm done, resources free", or post-run notes:
@@ -781,19 +755,6 @@ Upload screenshots to the PR using the GitHub Git API (no local git operations
**CRITICAL — NEVER post a bare directory link like `https://github.com/.../tree/...`.** Every screenshot MUST appear as `![name](raw_url)` inline in the PR comment so reviewers can see them without clicking any links. After posting, the verification step below greps the comment for `![` tags and exits 1 if none are found — the test run is considered incomplete until this passes.
**CRITICAL — NEVER paste absolute local paths into the PR comment.** Strings like `/Users/…`, `/home/…`, `C:\…` are useless to every reviewer except you. Before posting, grep the final body for `/Users/`, `/home/`, `/tmp/`, `/private/`, `C:\`, `~/` and either drop those lines entirely or rewrite them as repo-relative paths (`autogpt_platform/backend/…`). The PR comment is an artifact reviewers on GitHub read — it must be self-contained on github.com. Keep local paths in `$RESULTS_DIR/test-report.md` for yourself; only copy the *content* they reference (excerpts, test names, log lines) into the PR comment, not the path.
**Pre-post sanity check** (paste after building the comment body, before `gh api ... comments`):
```bash
# Reject any local-looking absolute path or home-dir shortcut in the body
if grep -nE '(^|[^A-Za-z])(/Users/|/home/|/tmp/|/private/|C:\\|~/)[A-Za-z0-9]' "$COMMENT_FILE" ; then
echo "ABORT: local filesystem paths detected in PR comment body."
echo "Remove or rewrite as repo-relative (autogpt_platform/...) before posting."
exit 1
fi
```
```bash
# Upload screenshots via GitHub Git API (creates blobs, tree, commit, and ref remotely)
REPO="Significant-Gravitas/AutoGPT"

View File

@@ -45,7 +45,6 @@ from backend.copilot.model import (
maybe_append_user_message,
upsert_chat_session,
)
from backend.copilot.model_router import resolve_model
from backend.copilot.moonshot import is_moonshot_model
from backend.copilot.pending_message_helpers import (
combine_pending_with_current,
@@ -122,15 +121,7 @@ logger = logging.getLogger(__name__)
_background_tasks: set[asyncio.Task[Any]] = set()
# Maximum number of tool-call rounds before forcing a text response.
_MAX_TOOL_ROUNDS = 100
# Hint appended on the last tool round so the model wraps up with a summary
# instead of issuing another tool call that gets cut off cold.
_LAST_ITERATION_HINT = (
"You have reached the tool-call budget for this turn. Do not call any "
"more tools — produce a final text response summarizing what you did, "
"what remains, and how the user can continue the work in the next turn."
)
_MAX_TOOL_ROUNDS = 30
# Max seconds to wait for transcript upload in the finally block before
# letting it continue as a background task (tracked in _background_tasks).
@@ -328,17 +319,20 @@ def _filter_tools_by_permissions(
]
async def _resolve_baseline_model(
tier: CopilotLlmModel | None, user_id: str | None
) -> str:
def _resolve_baseline_model(tier: CopilotLlmModel | None) -> str:
"""Pick the model for the baseline path based on the per-request tier.
Delegates to :func:`copilot.model_router.resolve_model` so the
``(fast, tier)`` cell is LD-overridable per user. ``None`` tier
maps to ``"standard"``.
Baseline resolves independently of SDK via the ``fast_*_model`` cells
of the (path, tier) matrix. ``'standard'`` / ``None`` picks Kimi
K2.6 by default (cheap + OpenRouter ``reasoning`` support);
``'advanced'`` picks Opus by default so the advanced tier is a clean
A/B against the SDK advanced tier — same model, different path —
isolating reasoning-wire + cache differences from model capability.
Both defaults are overridable per ``CHAT_FAST_*_MODEL`` env vars.
"""
tier_name = "advanced" if tier == "advanced" else "standard"
return await resolve_model("fast", tier_name, user_id, config=config)
if tier == "advanced":
return config.fast_advanced_model
return config.fast_standard_model
@dataclass
@@ -1388,7 +1382,7 @@ async def stream_chat_completion_baseline(
# Select model based on the per-request tier toggle (standard / advanced).
# The path (fast vs extended_thinking) is already decided — we're in the
# baseline (fast) path; ``mode`` is accepted for logging parity only.
active_model = await _resolve_baseline_model(model, user_id)
active_model = _resolve_baseline_model(model)
# --- E2B sandbox setup (feature parity with SDK path) ---
e2b_sandbox = None
@@ -1759,7 +1753,6 @@ async def stream_chat_completion_baseline(
execute_tool=_bound_tool_executor,
update_conversation=_bound_conversation_updater,
max_iterations=_MAX_TOOL_ROUNDS,
last_iteration_message=_LAST_ITERATION_HINT,
):
loop_result_holder[0] = loop_result
# Inject any messages the user queued while the turn was

View File

@@ -21,7 +21,6 @@ from backend.copilot.baseline.service import (
_is_anthropic_model,
_mark_system_message_with_cache_control,
_mark_tools_with_cache_control,
_supports_prompt_cache_markers,
)
from backend.copilot.model import ChatMessage
from backend.copilot.response_model import (
@@ -2026,55 +2025,3 @@ class TestBaselineReasoningStreaming:
reasoning_rows = [m for m in state.session_messages if m.role == "reasoning"]
assert len(reasoning_rows) == 1
assert reasoning_rows[0].content == "first thought"
class TestSupportsPromptCacheMarkers:
"""``_supports_prompt_cache_markers`` is the widened gate for
emitting ``cache_control`` markers on message content. It's a
superset of ``_is_anthropic_model`` that ALSO admits Moonshot
(whose Anthropic-compat endpoint honours the marker) while keeping
the False answer for OpenAI / Grok / Gemini (which 400 on the
unknown field)."""
@pytest.mark.parametrize(
"model",
[
"anthropic/claude-sonnet-4-6",
"claude-3-5-sonnet-20241022",
"anthropic.claude-3-5-sonnet",
"ANTHROPIC/Claude-Opus",
],
)
def test_anthropic_routes_are_supported(self, model):
assert _supports_prompt_cache_markers(model) is True
@pytest.mark.parametrize(
"model",
[
"moonshotai/kimi-k2.6",
"moonshotai/kimi-k2-thinking",
"moonshotai/kimi-k2.5",
"moonshotai/kimi-k3.0", # future SKU
],
)
def test_moonshot_routes_are_supported(self, model):
"""The whole reason this predicate exists — Moonshot must be
True even though ``_is_anthropic_model`` is False for it."""
assert _supports_prompt_cache_markers(model) is True
# Verify this is strictly wider than the anthropic-only check.
assert _is_anthropic_model(model) is False
@pytest.mark.parametrize(
"model",
[
"openai/gpt-4o",
"google/gemini-2.5-pro",
"xai/grok-4",
"meta-llama/llama-3.3-70b-instruct",
"deepseek/deepseek-v3",
],
)
def test_other_providers_still_rejected(self, model):
"""Regression guard: OpenAI/Grok/Gemini still 400 on
``cache_control``, so the widened gate must keep them out."""
assert _supports_prompt_cache_markers(model) is False

View File

@@ -67,40 +67,34 @@ class TestResolveBaselineModel:
Baseline reads the ``fast_*_model`` cells of the (path, tier) matrix
and never falls through to the SDK-side ``thinking_*_model`` cells.
Without a user_id (so no LD context) the resolver returns the
``ChatConfig`` static default; per-user overrides are exercised in
``copilot/model_router_test.py``.
Default routing:
- ``standard`` / ``None`` → ``config.fast_standard_model`` (Kimi K2.6)
- ``advanced`` → ``config.fast_advanced_model`` (Opus — same as SDK's
advanced tier, so the advanced A/B isolates path differences)
"""
@pytest.mark.asyncio
async def test_advanced_tier_selects_fast_advanced_model(self):
assert (
await _resolve_baseline_model("advanced", None)
== config.fast_advanced_model
)
def test_advanced_tier_selects_fast_advanced_model(self):
assert _resolve_baseline_model("advanced") == config.fast_advanced_model
@pytest.mark.asyncio
async def test_standard_tier_selects_fast_standard_model(self):
assert (
await _resolve_baseline_model("standard", None)
== config.fast_standard_model
)
def test_standard_tier_selects_fast_standard_model(self):
assert _resolve_baseline_model("standard") == config.fast_standard_model
@pytest.mark.asyncio
async def test_none_tier_selects_fast_standard_model(self):
"""Baseline users without a tier get the fast-standard default."""
assert await _resolve_baseline_model(None, None) == config.fast_standard_model
def test_none_tier_selects_fast_standard_model(self):
"""Baseline users without a tier get the cheap fast-standard default."""
assert _resolve_baseline_model(None) == config.fast_standard_model
def test_fast_standard_default_is_sonnet(self):
"""Shipped default: Sonnet on the baseline standard cell — the
non-Anthropic routes ship via the LD flag instead of a config
change. Asserts the declared ``Field`` default so a deploy-time
``CHAT_FAST_STANDARD_MODEL`` override doesn't flake CI."""
def test_fast_standard_default_is_kimi(self):
"""Shipped default: Kimi K2.6 on the baseline standard cell.
Asserts the declared ``Field`` default — env-independent — so a
deploy-time ``CHAT_FAST_STANDARD_MODEL`` rollback override
doesn't fail CI while still pinning the shipped default.
"""
from backend.copilot.config import ChatConfig
assert (
ChatConfig.model_fields["fast_standard_model"].default
== "anthropic/claude-sonnet-4-6"
== "moonshotai/kimi-k2.6"
)
def test_fast_advanced_default_is_opus(self):
@@ -114,6 +108,17 @@ class TestResolveBaselineModel:
== "anthropic/claude-opus-4.7"
)
def test_standard_cells_share_kimi_default_across_paths(self):
"""After PR #12878 both paths default to the same cheap model
(``moonshotai/kimi-k2.6``). The split exists for *override*
flexibility, not for forcing a price gap — guard against an
accidental regression that pins the SDK path back to Sonnet."""
from backend.copilot.config import ChatConfig
kimi = "moonshotai/kimi-k2.6"
assert ChatConfig.model_fields["fast_standard_model"].default == kimi
assert ChatConfig.model_fields["thinking_standard_model"].default == kimi
def test_standard_and_advanced_cells_differ_on_fast(self):
"""Advanced tier defaults to a different model than standard on
the baseline path. Checked against declared ``Field`` defaults

View File

@@ -43,29 +43,42 @@ class ChatConfig(BaseSettings):
# ``CHAT_FAST_MODEL``) are preserved via ``validation_alias`` so
# existing deployments continue to override the same effective cell.
fast_standard_model: str = Field(
default="anthropic/claude-sonnet-4-6",
default="moonshotai/kimi-k2.6",
validation_alias=AliasChoices(
"CHAT_FAST_STANDARD_MODEL",
"CHAT_FAST_MODEL",
),
description="Baseline path, 'standard' / ``None`` tier. Per-user "
"overrides flow through the ``copilot-fast-standard-model`` LD flag "
"(see ``copilot/model_router.py``); this value is the fallback.",
description="Baseline path, 'standard' / ``None`` tier. Kimi K2.6 "
"by default: ~5x cheaper input and ~5.4x cheaper output than Sonnet, "
"SWE-Bench Verified parity with Opus, and OpenRouter advertises the "
"``reasoning`` + ``include_reasoning`` extension params on the "
"Moonshot endpoints — so the baseline reasoning plumbing lights up "
"without provider-specific code. Roll back to the Anthropic route "
"via ``CHAT_FAST_STANDARD_MODEL=anthropic/claude-sonnet-4.6`` (then "
"``cache_control`` breakpoints reactivate via "
"``_is_anthropic_model``).",
)
fast_advanced_model: str = Field(
default="anthropic/claude-opus-4.7",
validation_alias=AliasChoices("CHAT_FAST_ADVANCED_MODEL"),
description="Baseline path, 'advanced' tier. LD override: "
"``copilot-fast-advanced-model``.",
description="Baseline path, 'advanced' tier. Opus by default. "
"Override via ``CHAT_FAST_ADVANCED_MODEL``.",
)
thinking_standard_model: str = Field(
default="anthropic/claude-sonnet-4-6",
default="moonshotai/kimi-k2.6",
validation_alias=AliasChoices(
"CHAT_THINKING_STANDARD_MODEL",
"CHAT_MODEL",
),
description="SDK (extended-thinking) path, 'standard' / ``None`` "
"tier. LD override: ``copilot-thinking-standard-model``.",
"tier. Kimi K2.6 by default: routed via OpenRouter's Anthropic-"
"compatible ``/v1/messages`` endpoint, which the Claude Agent SDK "
"CLI accepts as a drop-in ``ANTHROPIC_BASE_URL`` target. The same "
"cost/capability rationale as the baseline path applies — ~5x "
"cheaper than Sonnet at SWE-Bench parity. Roll back to Sonnet via "
"``CHAT_THINKING_STANDARD_MODEL=anthropic/claude-sonnet-4.6`` (then "
"the SDK ``cache_control`` markers reactivate). Direct-Anthropic "
"deployments (no OpenRouter) must override to an Anthropic model.",
)
thinking_advanced_model: str = Field(
default="anthropic/claude-opus-4.7",
@@ -73,8 +86,9 @@ class ChatConfig(BaseSettings):
"CHAT_THINKING_ADVANCED_MODEL",
"CHAT_ADVANCED_MODEL",
),
description="SDK (extended-thinking) path, 'advanced' tier. LD "
"override: ``copilot-thinking-advanced-model``.",
description="SDK (extended-thinking) path, 'advanced' tier. Opus "
"by default. Override via ``CHAT_THINKING_ADVANCED_MODEL`` "
"(legacy ``CHAT_ADVANCED_MODEL`` still honored).",
)
title_model: str = Field(
default="openai/gpt-4o-mini",

View File

@@ -203,38 +203,35 @@ class TestSdkModelVendorCompatibility:
guard in ``_normalize_model_name`` so misconfig surfaces at boot
instead of as a 500 on the first SDK turn."""
def test_direct_anthropic_with_kimi_override_raises(self):
"""A non-Anthropic SDK model must fail at config load when the
deployment has no OpenRouter credentials."""
def test_direct_anthropic_with_kimi_default_raises(self):
"""The ``moonshotai/kimi-k2.6`` default must fail at config load
when the deployment has no OpenRouter credentials."""
with pytest.raises(Exception, match="requires an Anthropic model"):
ChatConfig(
use_openrouter=False,
api_key=None,
base_url=None,
use_claude_code_subscription=False,
thinking_standard_model="moonshotai/kimi-k2.6",
)
def test_direct_anthropic_with_anthropic_default_succeeds(self):
"""Direct-Anthropic mode is fine when both SDK slugs are anthropic/*
— which is the default after the LD-routed model rollout."""
def test_direct_anthropic_with_anthropic_override_succeeds(self):
"""Direct-Anthropic mode is fine when both SDK slugs are anthropic/*."""
cfg = ChatConfig(
use_openrouter=False,
api_key=None,
base_url=None,
use_claude_code_subscription=False,
thinking_standard_model="anthropic/claude-sonnet-4-6",
)
assert cfg.thinking_standard_model == "anthropic/claude-sonnet-4-6"
def test_openrouter_with_kimi_override_succeeds(self):
"""Kimi slug round-trips cleanly when OpenRouter is on — exercised
via the LD-flag override path in production."""
def test_openrouter_with_kimi_default_succeeds(self):
"""Default Kimi slug round-trips cleanly when OpenRouter is on."""
cfg = ChatConfig(
use_openrouter=True,
api_key="or-key",
base_url="https://openrouter.ai/api/v1",
use_claude_code_subscription=False,
thinking_standard_model="moonshotai/kimi-k2.6",
)
assert cfg.thinking_standard_model == "moonshotai/kimi-k2.6"

View File

@@ -105,46 +105,25 @@ class CoPilotExecutor(AppProcess):
time.sleep(1e5)
def cleanup(self):
"""Graceful shutdown — mirrors ``backend.executor.manager`` pattern.
1. Stop consumer immediately (both the Python flag that gates
``_handle_run_message`` and ``channel.stop_consuming()`` at
the broker), so no new work enters.
2. Passively wait for ``active_tasks`` to drain — each turn's
own ``finally`` publishes its terminal state via
``mark_session_completed``. When a turn exits, ``on_run_done``
removes it from ``active_tasks`` and releases its cluster lock.
3. Shut down the thread-pool executor (cancels pending, leaves
running threads alone — process exit handles them).
4. Release any cluster locks still held (defensive — on_run_done's
finally should have already released them).
5. Stop message consumer threads + disconnect pika clients.
The zombie-session bug this PR targets is handled inside each
turn's own lifecycle by :func:`sync_fail_close_session`, NOT by
cleanup — so cleanup can stay as a simple "wait, then teardown"
and matches agent-executor's proven pattern.
"""
"""Graceful shutdown with active execution waiting."""
pid = os.getpid()
prefix = f"[cleanup {pid}]"
logger.info(f"{prefix} Starting graceful shutdown...")
logger.info(f"[cleanup {pid}] Starting graceful shutdown...")
# 1. Stop consumer — flag AND broker-side
# Signal the consumer thread to stop
try:
self.stop_consuming.set()
run_channel = self.run_client.get_channel()
run_channel.connection.add_callback_threadsafe(
lambda: run_channel.stop_consuming()
)
logger.info(f"{prefix} Consumer has been signaled to stop")
logger.info(f"[cleanup {pid}] Consumer has been signaled to stop")
except Exception as e:
logger.error(f"{prefix} Error stopping consumer: {e}")
logger.error(f"[cleanup {pid}] Error stopping consumer: {e}")
# 2. Wait for in-flight turns to finish naturally
# Wait for active executions to complete
if self.active_tasks:
logger.info(
f"{prefix} Waiting for {len(self.active_tasks)} active tasks "
f"to complete (timeout: {GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s)..."
f"[cleanup {pid}] Waiting for {len(self.active_tasks)} active tasks to complete (timeout: {GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s)..."
)
start_time = time.monotonic()
@@ -159,42 +138,38 @@ class CoPilotExecutor(AppProcess):
if not self.active_tasks:
break
now = time.monotonic()
if now - last_refresh >= lock_refresh_interval:
# Refresh cluster locks periodically
current_time = time.monotonic()
if current_time - last_refresh >= lock_refresh_interval:
for lock in list(self._task_locks.values()):
try:
lock.refresh()
except Exception as e:
logger.warning(f"{prefix} Failed to refresh lock: {e}")
last_refresh = now
logger.warning(
f"[cleanup {pid}] Failed to refresh lock: {e}"
)
last_refresh = current_time
logger.info(
f"{prefix} {len(self.active_tasks)} tasks still active, waiting..."
f"[cleanup {pid}] {len(self.active_tasks)} tasks still active, waiting..."
)
time.sleep(10.0)
if self.active_tasks:
logger.warning(
f"{prefix} {len(self.active_tasks)} tasks still running after "
f"{GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s — process exit will "
f"abandon them; RabbitMQ redelivery handles the message."
)
# 3. Stop message consumer threads
# Stop message consumers
if self._run_thread:
self._stop_message_consumers(
self._run_thread, self.run_client, f"{prefix} [run]"
self._run_thread, self.run_client, "[cleanup][run]"
)
if self._cancel_thread:
self._stop_message_consumers(
self._cancel_thread, self.cancel_client, f"{prefix} [cancel]"
self._cancel_thread, self.cancel_client, "[cleanup][cancel]"
)
# 4. Worker cleanup + executor shutdown
# Clean up worker threads (closes per-loop workspace storage sessions)
if self._executor:
from .processor import cleanup_worker
logger.info(f"{prefix} Cleaning up workers...")
logger.info(f"[cleanup {pid}] Cleaning up workers...")
futures = []
for _ in range(self._executor._max_workers):
futures.append(self._executor.submit(cleanup_worker))
@@ -202,20 +177,22 @@ class CoPilotExecutor(AppProcess):
try:
f.result(timeout=10)
except Exception as e:
logger.warning(f"{prefix} Worker cleanup error: {e}")
logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}")
logger.info(f"{prefix} Shutting down executor...")
logger.info(f"[cleanup {pid}] Shutting down executor...")
self._executor.shutdown(wait=False)
# 5. Release any cluster locks still held
# Release any remaining locks
for session_id, lock in list(self._task_locks.items()):
try:
lock.release()
logger.info(f"{prefix} Released lock for {session_id}")
logger.info(f"[cleanup {pid}] Released lock for {session_id}")
except Exception as e:
logger.error(f"{prefix} Failed to release lock for {session_id}: {e}")
logger.error(
f"[cleanup {pid}] Failed to release lock for {session_id}: {e}"
)
logger.info(f"{prefix} Graceful shutdown completed")
logger.info(f"[cleanup {pid}] Graceful shutdown completed")
# ============ RabbitMQ Consumer Methods ============ #
@@ -410,12 +387,13 @@ class CoPilotExecutor(AppProcess):
# Execute the task
try:
self._task_locks[session_id] = cluster_lock
logger.info(
f"Acquired cluster lock for {session_id}, "
f"executor_id={self.executor_id}"
)
self._task_locks[session_id] = cluster_lock
cancel_event = threading.Event()
future = self.executor.submit(
execute_copilot_turn, entry, cancel_event, cluster_lock
@@ -447,6 +425,7 @@ class CoPilotExecutor(AppProcess):
error_msg = str(e) or type(e).__name__
logger.exception(f"Error in run completion callback: {error_msg}")
finally:
# Release the cluster lock
if session_id in self._task_locks:
logger.info(f"Releasing cluster lock for {session_id}")
self._task_locks[session_id].release()

View File

@@ -5,7 +5,6 @@ in a thread-local context, following the graph executor pattern.
"""
import asyncio
import concurrent.futures
import logging
import os
import subprocess
@@ -31,87 +30,6 @@ from .utils import CoPilotExecutionEntry, CoPilotLogMetadata
logger = TruncatedLogger(logging.getLogger(__name__), prefix="[CoPilotExecutor]")
SHUTDOWN_ERROR_MESSAGE = (
"Copilot executor shut down before this turn finished. Please retry."
)
# Max time execute() blocks after calling future.cancel() / when draining a
# soon-to-be-cancelled future. Gives _execute_async's own finally a chance to
# publish the accurate terminal state over the Redis CAS; long enough to let
# an in-flight Redis call settle, short enough that shutdown doesn't stall.
_CANCEL_GRACE_SECONDS = 5.0
# Max time the sync safety net itself spends on a single Redis CAS. Without
# this bound the whole point of ``sync_fail_close_session`` is defeated —
# ``mark_session_completed`` would hang on the same broken Redis that caused
# the original failure. On timeout we give up silently; worst case the
# session stays ``running`` until the stale-session watchdog reaps it, but
# at least the pool worker thread isn't blocked forever.
_FAIL_CLOSE_REDIS_TIMEOUT = 10.0
# Module-level symbol preserved for backward-compat with callers that import
# ``sync_fail_close_session``; the real implementation now lives on
# ``CoPilotProcessor`` so it can reuse ``self.execution_loop`` (same
# pattern as ``backend.executor.manager``'s ``node_execution_loop`` bridge
# at :meth:`ExecutionProcessor.on_graph_execution`).
def sync_fail_close_session(
session_id: str,
log: "CoPilotLogMetadata | TruncatedLogger",
execution_loop: asyncio.AbstractEventLoop,
) -> None:
"""Synchronously mark *session_id* as failed from the pool worker thread.
Submits the CAS coroutine to the long-lived *execution_loop* via
``run_coroutine_threadsafe`` — the same shape agent-executor uses at
:meth:`backend.executor.manager.ExecutionProcessor.on_graph_execution`
to reach its ``node_execution_loop`` from the pool worker. Reusing the
persistent loop means:
* no fresh TCP connection per turn (the ``@thread_cached``
``AsyncRedis`` on the execution thread stays bound to the same loop
and is reused across every turn);
* no loop-teardown overhead;
* no ``clear_cache()`` gymnastics to dodge the "loop is closed" pitfall.
``mark_session_completed`` is an atomic CAS on ``status == "running"``,
so when the async path already wrote a terminal state the sync call is
a cheap no-op. The inner ``asyncio.wait_for`` bounds the Redis call so
a wedged Redis can't hang the safety net for the full redis-py default
TCP timeout; the outer ``.result(timeout=...)`` is a belt-and-braces
upper bound for the cross-thread wait.
"""
async def _bounded() -> None:
await asyncio.wait_for(
stream_registry.mark_session_completed(
session_id, error_message=SHUTDOWN_ERROR_MESSAGE
),
timeout=_FAIL_CLOSE_REDIS_TIMEOUT,
)
try:
future = asyncio.run_coroutine_threadsafe(_bounded(), execution_loop)
except RuntimeError as e:
# execution_loop is closed — happens if cleanup() already ran the
# per-worker teardown. Nothing we can do; let the stale-session
# watchdog reap it.
log.warning(f"sync fail-close skipped (execution_loop closed): {e}")
return
try:
future.result(timeout=_FAIL_CLOSE_REDIS_TIMEOUT + 2)
except concurrent.futures.TimeoutError:
log.warning(
f"sync fail-close timed out after {_FAIL_CLOSE_REDIS_TIMEOUT}s "
f"(session={session_id})"
)
future.cancel()
except Exception as e:
log.warning(f"sync fail-close mark_session_completed failed: {e}")
# ============ Mode Routing ============ #
@@ -334,13 +252,12 @@ class CoPilotProcessor:
):
"""Execute a CoPilot turn.
Thin wrapper around :meth:`_execute`. The ``try/finally`` here
guarantees :func:`sync_fail_close_session` runs on every exit
path — normal completion, exception, or a wedged event loop
that escapes via :data:`_CANCEL_GRACE_SECONDS` timeout.
``mark_session_completed`` is an atomic CAS on
``status == "running"``, so when the async path already wrote a
terminal state the sync call is a cheap no-op.
Runs the async logic in the worker's event loop and handles errors.
Args:
entry: The turn payload containing session and message info
cancel: Threading event to signal cancellation
cluster_lock: Distributed lock to prevent duplicate execution
"""
log = CoPilotLogMetadata(
logging.getLogger(__name__),
@@ -348,28 +265,10 @@ class CoPilotProcessor:
user_id=entry.user_id,
)
log.info("Starting execution")
start_time = time.monotonic()
try:
self._execute(entry, cancel, cluster_lock, log)
finally:
sync_fail_close_session(entry.session_id, log, self.execution_loop)
elapsed = time.monotonic() - start_time
log.info(f"Execution completed in {elapsed:.2f}s")
def _execute(
self,
entry: CoPilotExecutionEntry,
cancel: threading.Event,
cluster_lock: ClusterLock,
log: CoPilotLogMetadata,
):
"""Submit the async turn to ``self.execution_loop`` and drive it.
Handles the sync/async boundary (cancel-event checks, cluster-lock
refresh, bounded waits) without any Redis-state cleanup logic —
that lives in :func:`sync_fail_close_session` which the outer
:meth:`execute` always invokes on exit.
"""
# Run the async execution in our event loop
future = asyncio.run_coroutine_threadsafe(
self._execute_async(entry, cancel, cluster_lock, log),
self.execution_loop,
@@ -383,27 +282,16 @@ class CoPilotProcessor:
if cancel.is_set():
log.info("Cancellation requested")
future.cancel()
# Give _execute_async's own finally a short window to
# publish its accurate terminal state before the outer
# sync safety net fires.
try:
future.result(timeout=_CANCEL_GRACE_SECONDS)
except BaseException:
pass
return
break
# Refresh cluster lock to maintain ownership
cluster_lock.refresh()
if not future.cancelled():
# Bounded timeout so a wedged event loop can't trap us here —
# on timeout we escape to execute()'s finally and the sync
# safety net fires.
try:
future.result(timeout=_CANCEL_GRACE_SECONDS)
except concurrent.futures.TimeoutError:
log.warning(
"Future did not complete within grace window; "
"falling through to sync fail-close"
)
# Get result to propagate any exceptions
future.result()
elapsed = time.monotonic() - start_time
log.info(f"Execution completed in {elapsed:.2f}s")
async def _execute_async(
self,

View File

@@ -10,8 +10,6 @@ the real production helpers from ``processor.py`` so the routing logic
has meaningful coverage.
"""
import asyncio
import concurrent.futures
import logging
import threading
from unittest.mock import AsyncMock, MagicMock, patch
@@ -22,7 +20,6 @@ from backend.copilot.executor.processor import (
CoPilotProcessor,
resolve_effective_mode,
resolve_use_sdk_for_mode,
sync_fail_close_session,
)
from backend.copilot.executor.utils import CoPilotExecutionEntry, CoPilotLogMetadata
@@ -278,221 +275,3 @@ class TestExecuteAsyncAclose:
await proc._execute_async(_make_entry(), cancel, cluster_lock, _make_log())
assert published.aclose_called is True
@pytest.fixture
def exec_loop():
"""Long-lived asyncio loop on a daemon thread — mirrors the layout
``CoPilotProcessor`` sets up (``execution_loop`` + ``execution_thread``)
so ``sync_fail_close_session`` has a real cross-thread loop to submit
into via ``run_coroutine_threadsafe``."""
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread.start()
try:
yield loop
finally:
loop.call_soon_threadsafe(loop.stop)
thread.join(timeout=5)
loop.close()
class TestSyncFailCloseSession:
"""``sync_fail_close_session`` is the last-line-of-defense invoked from
``CoPilotProcessor.execute``'s ``finally``. It must call
``mark_session_completed`` via the processor's long-lived
``execution_loop`` (cross-thread submit) and must swallow Redis
failures so a transient outage doesn't propagate out of the finally."""
def test_invokes_mark_session_completed_with_shutdown_message(
self, exec_loop
) -> None:
mock_mark = AsyncMock()
with patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=mock_mark,
):
sync_fail_close_session("sess-1", _make_log(), exec_loop)
mock_mark.assert_awaited_once()
assert mock_mark.await_args is not None
assert mock_mark.await_args.args[0] == "sess-1"
assert "shut down" in mock_mark.await_args.kwargs["error_message"].lower()
def test_swallows_redis_error(self, exec_loop) -> None:
# Raising from the mock ensures the helper catches the exception
# instead of propagating it back into execute()'s finally block.
mock_mark = AsyncMock(side_effect=RuntimeError("redis down"))
with patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=mock_mark,
):
sync_fail_close_session("sess-2", _make_log(), exec_loop) # must not raise
mock_mark.assert_awaited_once()
def test_closed_execution_loop_skipped_cleanly(self) -> None:
"""If cleanup_worker has already stopped the execution_loop by the
time the safety net fires, ``run_coroutine_threadsafe`` raises
RuntimeError. Expected behavior: log + return without propagating."""
dead_loop = asyncio.new_event_loop()
dead_loop.close()
mock_mark = AsyncMock()
with patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=mock_mark,
):
# Must not raise even though the loop is closed
sync_fail_close_session("sess-closed-loop", _make_log(), dead_loop)
# mark_session_completed was never scheduled because the loop was dead
mock_mark.assert_not_awaited()
def test_bounded_timeout_when_redis_hangs(self, exec_loop) -> None:
"""Scenario D: Redis unreachable — the inner ``asyncio.wait_for``
must fire and the helper must return without blocking the worker.
Simulates a wedged Redis by sleeping past the 10s fail-close budget.
The helper must return within the configured grace (+ a small
scheduler margin) and must not re-raise.
"""
import time as _time
from backend.copilot.executor.processor import _FAIL_CLOSE_REDIS_TIMEOUT
async def _hang(*_args, **_kwargs):
await asyncio.sleep(_FAIL_CLOSE_REDIS_TIMEOUT + 5)
with patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=_hang,
):
start = _time.monotonic()
sync_fail_close_session(
"sess-hang", _make_log(), exec_loop
) # must not raise
elapsed = _time.monotonic() - start
# wait_for fires at _FAIL_CLOSE_REDIS_TIMEOUT; outer future.result
# has +2s slack. If the timeout is missing/broken the helper would
# block the full sleep duration (~15s).
assert elapsed < _FAIL_CLOSE_REDIS_TIMEOUT + 4.0, (
f"sync_fail_close_session hung for {elapsed:.1f}s — bounded "
f"timeout did not fire"
)
# ---------------------------------------------------------------------------
# End-to-end execute() safety-net coverage — the PR's core invariant
# ---------------------------------------------------------------------------
class TestExecuteSafetyNet:
"""``CoPilotProcessor.execute`` must always invoke
``sync_fail_close_session`` in its ``finally`` so a session never stays
``status=running`` in Redis.
Validates the four deploy-time scenarios the PR targets:
* A — SIGTERM mid-turn: ``cancel`` event fires, ``_execute`` returns,
safety net still runs.
* B — happy path: normal completion, safety net runs (cheap CAS no-op).
* C — zombie Redis state: the async ``mark_session_completed`` in
``_execute_async`` blows up, but the outer safety net marks the
session failed anyway.
* D — covered by ``TestSyncFailCloseSession::test_bounded_timeout…``.
"""
def _attach_exec_loop(self, proc: CoPilotProcessor, loop) -> None:
"""``execute`` dispatches the safety net onto ``self.execution_loop``.
Tests don't call ``on_executor_start`` (which spawns the real
per-worker loop), so wire the shared fixture loop in directly."""
proc.execution_loop = loop
def _run_execute_in_thread(self, proc: CoPilotProcessor, cancel: threading.Event):
"""``CoPilotProcessor.execute`` expects to be called from a pool
worker thread that has *no* running event loop, so we always run
it off the main thread to preserve that invariant. Returns the
future so callers can inspect both result and exception paths."""
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
try:
fut = pool.submit(proc.execute, _make_entry(), cancel, MagicMock())
# Block until execute() returns (or raises) so the safety net
# has run by the time we inspect mocks.
try:
fut.result(timeout=30)
except BaseException:
pass
return fut
finally:
pool.shutdown(wait=True)
def test_happy_path_invokes_safety_net(self, exec_loop) -> None:
"""Scenario B: normal completion still runs the sync safety net.
Proves the ``finally`` always fires, even when nothing went wrong —
``mark_session_completed``'s atomic CAS makes this a cheap no-op
in production."""
mock_mark = AsyncMock()
proc = CoPilotProcessor()
self._attach_exec_loop(proc, exec_loop)
with patch.object(proc, "_execute"), patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=mock_mark,
):
self._run_execute_in_thread(proc, threading.Event())
mock_mark.assert_awaited_once()
assert mock_mark.await_args is not None
assert mock_mark.await_args.args[0] == "sess-1"
def test_sigterm_mid_turn_invokes_safety_net(self, exec_loop) -> None:
"""Scenario A: worker raises (simulating future.cancel + grace
timeout escaping ``_execute``); ``execute`` must still reach the
safety net in its ``finally`` and mark the session failed."""
mock_mark = AsyncMock()
proc = CoPilotProcessor()
self._attach_exec_loop(proc, exec_loop)
with patch.object(
proc,
"_execute",
side_effect=concurrent.futures.TimeoutError("grace expired"),
), patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=mock_mark,
):
self._run_execute_in_thread(proc, threading.Event())
mock_mark.assert_awaited_once()
def test_zombie_redis_async_path_still_marks_session_failed(
self, exec_loop
) -> None:
"""Scenario C: ``_execute_async``'s own ``mark_session_completed``
call is broken (simulating the exact async-Redis hiccup that caused
the original zombie sessions). The outer ``sync_fail_close_session``
runs on the processor's long-lived ``execution_loop`` and succeeds
where the async path failed."""
call_log: list[str] = []
async def _ok(*args, **kwargs):
call_log.append("sync-ok")
def _broken_execute(entry, cancel, cluster_lock, log):
# Simulate the async path raising because its Redis client is
# wedged (the pre-fix zombie-session scenario).
raise RuntimeError("async Redis client broken")
proc = CoPilotProcessor()
self._attach_exec_loop(proc, exec_loop)
with patch.object(proc, "_execute", side_effect=_broken_execute), patch(
"backend.copilot.executor.processor.stream_registry.mark_session_completed",
new=_ok,
):
self._run_execute_in_thread(proc, threading.Event())
# The sync safety net must have fired despite the async path
# blowing up — this is the core guarantee of the PR.
assert call_log == [
"sync-ok"
], f"expected sync_fail_close_session to run once, got {call_log!r}"

View File

@@ -89,16 +89,11 @@ def get_session_lock_key(session_id: str) -> str:
# CoPilot operations can include extended thinking and agent generation
# which may take several hours to complete. Matches the pod's
# terminationGracePeriodSeconds in the helm chart so a rolling deploy can let
# the longest legitimate turn finish. Also bounds the stale-session auto-
# complete watchdog in stream_registry (consumer_timeout + 5min buffer).
COPILOT_CONSUMER_TIMEOUT_SECONDS = 6 * 60 * 60 # 6 hours
# which may take 30+ minutes to complete
COPILOT_CONSUMER_TIMEOUT_SECONDS = 60 * 60 # 1 hour
# Graceful shutdown timeout - must match COPILOT_CONSUMER_TIMEOUT_SECONDS so
# cleanup can let the longest legitimate turn complete before the pod is
# SIGKILL'd by kubelet.
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = COPILOT_CONSUMER_TIMEOUT_SECONDS
# Graceful shutdown timeout - allow in-flight operations to complete
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = 30 * 60 # 30 minutes
def create_copilot_queue_config() -> RabbitMQConfig:
@@ -118,27 +113,9 @@ def create_copilot_queue_config() -> RabbitMQConfig:
durable=True,
auto_delete=False,
arguments={
# Consumer timeout matches the pod graceful-shutdown window so a
# rolling deploy never forces redelivery of a turn that the pod
# is still legitimately finishing.
#
# Deploy note: RabbitMQ (verified on 4.1.4) does NOT strictly
# compare ``x-consumer-timeout`` on queue redeclaration, so this
# value can change between deploys without triggering
# PRECONDITION_FAILED. To update the *effective* timeout on an
# already-running queue before the new code deploys (so pods
# mid-shutdown don't have their consumer cancelled at the old
# limit), apply a policy:
#
# rabbitmqctl set_policy copilot-consumer-timeout \
# "^copilot_execution_queue$" \
# '{"consumer-timeout": 21600000}' \
# --apply-to queues
#
# The policy takes effect immediately. Once the policy is set
# to match the code's value the policy is redundant for new
# pods and can be removed after a stable deploy if desired —
# but it's harmless to leave in place.
# Extended consumer timeout for long-running LLM operations
# Default 30-minute timeout is insufficient for extended thinking
# and agent generation which can take 30+ minutes
"x-consumer-timeout": COPILOT_CONSUMER_TIMEOUT_SECONDS
* 1000,
},

View File

@@ -1,104 +0,0 @@
"""LaunchDarkly-aware model selection for the copilot.
Each cell of the ``(mode, tier)`` matrix has a static default baked into
``ChatConfig`` (see ``copilot/config.py``) and a matching LaunchDarkly
string-valued feature flag that can override it per-user. This module
centralises the lookup so both the baseline and SDK paths agree on the
selection rule and so A/B experiments can target a single cell without
shipping a config change.
Matrix:
+----------+-------------------------------------+-------------------------------------+
| | standard | advanced |
+----------+-------------------------------------+-------------------------------------+
| fast | copilot-fast-standard-model | copilot-fast-advanced-model |
| thinking | copilot-thinking-standard-model | copilot-thinking-advanced-model |
+----------+-------------------------------------+-------------------------------------+
LD flag values are arbitrary strings (model identifiers, e.g.
``"anthropic/claude-sonnet-4-6"`` or ``"moonshotai/kimi-k2.6"``). Empty
or non-string values fall back to the config default.
"""
from __future__ import annotations
import logging
from typing import Literal
from backend.copilot.config import ChatConfig
from backend.util.feature_flag import Flag, get_feature_flag_value
logger = logging.getLogger(__name__)
ModelMode = Literal["fast", "thinking"]
ModelTier = Literal["standard", "advanced"]
_FLAG_BY_CELL: dict[tuple[ModelMode, ModelTier], Flag] = {
("fast", "standard"): Flag.COPILOT_FAST_STANDARD_MODEL,
("fast", "advanced"): Flag.COPILOT_FAST_ADVANCED_MODEL,
("thinking", "standard"): Flag.COPILOT_THINKING_STANDARD_MODEL,
("thinking", "advanced"): Flag.COPILOT_THINKING_ADVANCED_MODEL,
}
def _config_default(config: ChatConfig, mode: ModelMode, tier: ModelTier) -> str:
if mode == "fast":
return (
config.fast_advanced_model
if tier == "advanced"
else config.fast_standard_model
)
return (
config.thinking_advanced_model
if tier == "advanced"
else config.thinking_standard_model
)
async def resolve_model(
mode: ModelMode,
tier: ModelTier,
user_id: str | None,
*,
config: ChatConfig,
) -> str:
"""Return the model identifier for a ``(mode, tier)`` cell.
Consults the matching LaunchDarkly flag for *user_id* first and
falls back to the ``ChatConfig`` default on missing user, missing
flag, or non-string flag value. Passing *config* explicitly keeps
the resolver cheap to unit-test.
"""
fallback = _config_default(config, mode, tier).strip()
if not user_id:
return fallback
flag = _FLAG_BY_CELL[(mode, tier)]
try:
value = await get_feature_flag_value(flag.value, user_id, default=fallback)
except Exception:
logger.warning(
"[model_router] LD lookup failed for %s — using config default %s",
flag.value,
fallback,
exc_info=True,
)
return fallback
if isinstance(value, str) and value.strip():
return value.strip()
if value != fallback:
reason = (
"empty string"
if isinstance(value, str)
else f"non-string ({type(value).__name__})"
)
logger.warning(
"[model_router] LD flag %s returned %s — using config default %s",
flag.value,
reason,
fallback,
)
return fallback

View File

@@ -1,166 +0,0 @@
"""Tests for the LD-aware model resolver."""
from unittest.mock import AsyncMock, patch
import pytest
from backend.copilot.config import ChatConfig
from backend.copilot.model_router import _FLAG_BY_CELL, _config_default, resolve_model
def _make_config() -> ChatConfig:
"""Build a config with the canonical defaults so tests read naturally."""
return ChatConfig(
fast_standard_model="anthropic/claude-sonnet-4-6",
fast_advanced_model="anthropic/claude-opus-4.7",
thinking_standard_model="anthropic/claude-sonnet-4-6",
thinking_advanced_model="anthropic/claude-opus-4.7",
)
class TestConfigDefault:
def test_fast_standard(self):
cfg = _make_config()
assert _config_default(cfg, "fast", "standard") == cfg.fast_standard_model
def test_fast_advanced(self):
cfg = _make_config()
assert _config_default(cfg, "fast", "advanced") == cfg.fast_advanced_model
def test_thinking_standard(self):
cfg = _make_config()
assert (
_config_default(cfg, "thinking", "standard") == cfg.thinking_standard_model
)
def test_thinking_advanced(self):
cfg = _make_config()
assert (
_config_default(cfg, "thinking", "advanced") == cfg.thinking_advanced_model
)
class TestResolveModel:
@pytest.mark.asyncio
async def test_missing_user_returns_fallback(self):
"""Without user_id there's no LD context — skip the lookup entirely."""
cfg = _make_config()
with patch("backend.copilot.model_router.get_feature_flag_value") as mock_flag:
result = await resolve_model("fast", "standard", None, config=cfg)
assert result == cfg.fast_standard_model
mock_flag.assert_not_called()
@pytest.mark.asyncio
async def test_missing_user_strips_whitespace_from_fallback(self):
"""Sentry MEDIUM: the anonymous-user branch returned an unstripped
config value. If ``CHAT_*_MODEL`` env carries trailing whitespace
the downstream ``resolved == tier_default`` check in
``_resolve_sdk_model_for_request`` would diverge from the
whitespace-stripped LD side, bypassing subscription mode for
every anonymous request. Strip at the source."""
cfg = ChatConfig(
fast_standard_model="anthropic/claude-sonnet-4-6 ", # trailing ws
fast_advanced_model="anthropic/claude-opus-4.7",
thinking_standard_model="anthropic/claude-sonnet-4-6",
thinking_advanced_model="anthropic/claude-opus-4.7",
)
result = await resolve_model("fast", "standard", None, config=cfg)
assert result == "anthropic/claude-sonnet-4-6"
@pytest.mark.asyncio
async def test_ld_string_override_wins(self):
"""LD-returned model string replaces the config default."""
cfg = _make_config()
with patch(
"backend.copilot.model_router.get_feature_flag_value",
new=AsyncMock(return_value="moonshotai/kimi-k2.6"),
):
result = await resolve_model("fast", "standard", "user-1", config=cfg)
assert result == "moonshotai/kimi-k2.6"
@pytest.mark.asyncio
async def test_whitespace_is_stripped(self):
cfg = _make_config()
with patch(
"backend.copilot.model_router.get_feature_flag_value",
new=AsyncMock(return_value=" xai/grok-4 "),
):
result = await resolve_model("thinking", "advanced", "user-1", config=cfg)
assert result == "xai/grok-4"
@pytest.mark.asyncio
async def test_non_string_value_falls_back_with_type_in_warning(self, caplog):
"""LD misconfigured as a boolean flag — don't try to use ``True`` as a
model name; return the config default. Warning must say
'non-string' (not 'empty string') so the LD operator knows the
flag type is wrong, not just missing a value."""
import logging
cfg = _make_config()
with caplog.at_level(logging.WARNING, logger="backend.copilot.model_router"):
with patch(
"backend.copilot.model_router.get_feature_flag_value",
new=AsyncMock(return_value=True),
):
result = await resolve_model("fast", "advanced", "user-1", config=cfg)
assert result == cfg.fast_advanced_model
assert any("non-string" in r.message for r in caplog.records)
@pytest.mark.asyncio
async def test_empty_string_falls_back_with_empty_in_warning(self, caplog):
"""When LD returns ``""`` the warning must say 'empty string'
not 'non-string' — so the operator doesn't chase a type bug
when the flag is simply unset to an empty value."""
import logging
cfg = _make_config()
with caplog.at_level(logging.WARNING, logger="backend.copilot.model_router"):
with patch(
"backend.copilot.model_router.get_feature_flag_value",
new=AsyncMock(return_value=""),
):
result = await resolve_model("fast", "standard", "user-1", config=cfg)
assert result == cfg.fast_standard_model
messages = [r.message for r in caplog.records]
assert any("empty string" in m for m in messages)
assert not any("non-string" in m for m in messages)
@pytest.mark.asyncio
async def test_ld_exception_falls_back(self):
"""LD client throws (network blip, SDK init race) — serve the default
instead of failing the whole request."""
cfg = _make_config()
with patch(
"backend.copilot.model_router.get_feature_flag_value",
new=AsyncMock(side_effect=RuntimeError("LD down")),
):
result = await resolve_model("fast", "standard", "user-1", config=cfg)
assert result == cfg.fast_standard_model
@pytest.mark.asyncio
async def test_all_four_cells_hit_distinct_flags(self):
"""Each (mode, tier) cell must route to its own flag — regression
guard against copy-paste bugs in the _FLAG_BY_CELL map."""
cfg = _make_config()
calls: list[str] = []
async def _capture(flag_key, user_id, default):
calls.append(flag_key)
return default
with patch(
"backend.copilot.model_router.get_feature_flag_value",
new=AsyncMock(side_effect=_capture),
):
await resolve_model("fast", "standard", "u", config=cfg)
await resolve_model("fast", "advanced", "u", config=cfg)
await resolve_model("thinking", "standard", "u", config=cfg)
await resolve_model("thinking", "advanced", "u", config=cfg)
assert calls == [
_FLAG_BY_CELL[("fast", "standard")].value,
_FLAG_BY_CELL[("fast", "advanced")].value,
_FLAG_BY_CELL[("thinking", "standard")].value,
_FLAG_BY_CELL[("thinking", "advanced")].value,
]
assert len(set(calls)) == 4

View File

@@ -240,15 +240,16 @@ async def peek_pending_messages(session_id: str) -> list[PendingMessage]:
return messages
async def clear_pending_messages_unsafe(session_id: str) -> None:
async def _clear_pending_messages_unsafe(session_id: str) -> None:
"""Drop the session's pending buffer — **not** the normal turn cleanup.
The ``_unsafe`` suffix warns: reaching for this at turn end drops queued
follow-ups on the floor instead of running them (the bug fixed by commit
b64be73). The atomic ``LPOP`` drain at turn start is the primary consumer;
anything pushed after the drain window belongs to the next turn by
definition. Retained only as an operator/debug escape hatch for manually
clearing a stuck session and as a fixture in the unit tests.
Named ``_unsafe`` because reaching for this at turn end drops queued
follow-ups on the floor instead of running them (the bug fixed by
commit b64be73). The atomic ``LPOP`` drain at turn start is the
primary consumer; anything pushed after the drain window belongs to
the next turn by definition. Retained only as an operator/debug
escape hatch for manually clearing a stuck session and as a fixture
in the unit tests.
"""
redis = await get_redis_async()
await redis.delete(_buffer_key(session_id))

View File

@@ -16,7 +16,7 @@ from backend.copilot.pending_messages import (
MAX_PENDING_MESSAGES,
PendingMessage,
PendingMessageContext,
clear_pending_messages_unsafe,
_clear_pending_messages_unsafe,
drain_and_format_for_injection,
drain_pending_for_persist,
drain_pending_messages,
@@ -208,15 +208,15 @@ async def test_cap_drops_oldest_when_exceeded(fake_redis: _FakeRedis) -> None:
async def test_clear_removes_buffer(fake_redis: _FakeRedis) -> None:
await push_pending_message("sess4", PendingMessage(content="x"))
await push_pending_message("sess4", PendingMessage(content="y"))
await clear_pending_messages_unsafe("sess4")
await _clear_pending_messages_unsafe("sess4")
assert await peek_pending_count("sess4") == 0
@pytest.mark.asyncio
async def test_clear_is_idempotent(fake_redis: _FakeRedis) -> None:
# Clearing an already-empty buffer should not raise
await clear_pending_messages_unsafe("sess_empty")
await clear_pending_messages_unsafe("sess_empty")
await _clear_pending_messages_unsafe("sess_empty")
await _clear_pending_messages_unsafe("sess_empty")
# ── Publish hook ────────────────────────────────────────────────────

View File

@@ -71,6 +71,7 @@ if TYPE_CHECKING:
ToolName = Literal[
# Platform tools (must match keys in TOOL_REGISTRY)
"add_understanding",
"ask_question",
"bash_exec",
"browser_act",
"browser_navigate",

View File

@@ -163,18 +163,14 @@ sandbox so `bash_exec` can access it for further processing.
The exact sandbox path is shown in the `[Sandbox copy available at ...]` note.
### GitHub CLI (`gh`) and git
- To check if the user has their GitHub account already connected, run `gh auth status`. Always check this before running `connect_integration(provider="github")` which will ask the user to connect their GitHub regardless if it's already connected.
- To check if the user has their GitHub account already connected, run `gh auth status`. Always check this before asking them to connect it.
- If the user has connected their GitHub account, both `gh` and `git` are
pre-authenticated — use them directly without any manual login step.
`git` HTTPS operations (clone, push, pull) work automatically.
- If the token changes mid-session (e.g. user reconnects with a new token),
run `gh auth setup-git` to re-register the credential helper.
- **MANDATORY:** You MUST run `gh auth status` before EVER calling
`connect_integration(provider="github")`. If it shows `Logged in`,
proceed directly — no integration connection needed. Never skip this check.
- If `gh auth status` shows NOT logged in, or `gh`/`git` fails with an
authentication error (e.g. "authentication required", "could not read
Username", or exit code 128), THEN call
- If `gh` or `git` fails with an authentication error (e.g. "authentication
required", "could not read Username", or exit code 128), call
`connect_integration(provider="github")` to surface the GitHub credentials
setup card so the user can connect their account. Once connected, retry
the operation.

View File

@@ -1,6 +1,7 @@
"""Tests for prompting helpers."""
"""Tests for agent generation guide — verifies clarification section."""
import importlib
from pathlib import Path
from backend.copilot import prompting
@@ -30,3 +31,28 @@ class TestGetSdkSupplementStaticPlaceholder:
def test_e2b_mode_has_no_session_placeholder(self):
result = prompting.get_sdk_supplement(use_e2b=True)
assert "<session-id>" not in result
class TestAgentGenerationGuideContainsClarifySection:
"""The agent generation guide must include the clarification section."""
def test_guide_includes_clarify_section(self):
guide_path = Path(__file__).parent / "sdk" / "agent_generation_guide.md"
content = guide_path.read_text(encoding="utf-8")
assert "Before or During Building" in content
def test_guide_mentions_find_block_for_clarification(self):
guide_path = Path(__file__).parent / "sdk" / "agent_generation_guide.md"
content = guide_path.read_text(encoding="utf-8")
clarify_section = content.split("Before or During Building")[1].split(
"### Workflow"
)[0]
assert "find_block" in clarify_section
def test_guide_mentions_ask_question_tool(self):
guide_path = Path(__file__).parent / "sdk" / "agent_generation_guide.md"
content = guide_path.read_text(encoding="utf-8")
clarify_section = content.split("Before or During Building")[1].split(
"### Workflow"
)[0]
assert "ask_question" in clarify_section

View File

@@ -3,6 +3,29 @@
You can create, edit, and customize agents directly. You ARE the brain —
generate the agent JSON yourself using block schemas, then validate and save.
### Clarifying — Before or During Building
Use `ask_question` whenever the user's intent is ambiguous — whether
that's before starting or midway through the workflow. Common moments:
- **Before building**: output format, delivery channel, data source, or
trigger is unspecified.
- **During block discovery**: multiple blocks could fit and the user
should choose.
- **During JSON generation**: a wiring decision depends on user
preference.
Steps:
1. Call `find_block` (or another discovery tool) to learn what the
platform actually supports for the ambiguous dimension.
2. Call `ask_question` with a concrete question listing the discovered
options (e.g. "The platform supports Gmail, Slack, and Google Docs —
which should the agent use for delivery?").
3. **Wait for the user's answer** before continuing.
**Skip this** when the goal already specifies all dimensions (e.g.
"scrape prices from Amazon and email me daily").
### Workflow for Creating/Editing Agents
1. **If editing**: First narrow to the specific agent by UUID, then fetch its

View File

@@ -57,7 +57,6 @@ from ..constants import (
from ..session_cleanup import prune_orphan_tool_calls
from ..context import encode_cwd_for_cli, get_workspace_manager
from ..graphiti.config import is_enabled_for_user
from ..model_router import resolve_model
from ..moonshot import (
is_moonshot_model as _is_moonshot_model,
override_cost_usd as _override_cost_for_moonshot,
@@ -735,16 +734,14 @@ def _normalize_model_name(raw_model: str) -> str:
def _resolve_sdk_model() -> str | None:
"""Resolve the SDK-CLI model name from static config (no LD lookup).
"""Resolve the model name for the Claude Agent SDK CLI.
``config.claude_agent_model`` is an explicit override that wins
unconditionally. When the Claude Code subscription is enabled and no
override is set, returns ``None`` so the CLI picks the model for the
user's subscription plan. Otherwise derives from
``config.thinking_standard_model``.
Uses `config.claude_agent_model` if set, otherwise derives from
`config.thinking_standard_model` via :func:`_normalize_model_name`.
For per-user routing (LaunchDarkly overrides), see
:func:`_resolve_sdk_model_for_request`.
When `use_claude_code_subscription` is enabled and no explicit
`claude_agent_model` is set, returns `None` so the CLI uses the
default model for the user's subscription plan.
"""
if config.claude_agent_model:
return config.claude_agent_model
@@ -753,18 +750,6 @@ def _resolve_sdk_model() -> str | None:
return _normalize_model_name(config.thinking_standard_model)
async def _resolve_thinking_model_for_user(
tier: "CopilotLlmModel",
user_id: str | None,
) -> str:
"""LD-aware thinking-tier model pick for a specific user.
Consults ``copilot-thinking-{tier}-model`` and falls back to the
``ChatConfig`` default on missing user / missing flag.
"""
return await resolve_model("thinking", tier, user_id, config=config)
def _resolve_fallback_model() -> str | None:
"""Resolve the fallback model name via :func:`_normalize_model_name`.
@@ -779,94 +764,37 @@ def _resolve_fallback_model() -> str | None:
async def _resolve_sdk_model_for_request(
model: "CopilotLlmModel | None",
session_id: str,
user_id: str | None = None,
) -> str | None:
"""Resolve the SDK model string for a turn.
Priority (highest first):
1. ``config.claude_agent_model`` — unconditional override, bypasses LD.
2. LaunchDarkly ``copilot-thinking-{tier}-model`` if it serves a value
different from the config default for *user_id*. An LD-served
override wins over subscription mode so admins can route specific
users to a specific model without flipping subscription on/off.
3. ``config.use_claude_code_subscription`` on the standard tier —
returns ``None`` so the CLI picks the subscription default (this
branch fires when LD has no opinion, i.e. the value equals the
config default).
4. ``ChatConfig`` static default for the tier.
1. Explicit per-request ``model`` tier from the frontend toggle.
2. Global config default (``_resolve_sdk_model()``).
Returns ``None`` when the Claude Code subscription default applies.
Rate-limit accounting no longer applies a multiplier — the real turn
cost (reported by the SDK) already reflects model-pricing differences.
"""
if config.claude_agent_model:
return config.claude_agent_model
tier_name: "CopilotLlmModel" = "advanced" if model == "advanced" else "standard"
# Strip at read time so a stray trailing space in ``CHAT_*_MODEL`` (a
# common ``.env`` pitfall) doesn't make the ``resolved == tier_default``
# comparison below spuriously diverge — ``resolve_model`` already strips
# the LD side, so both halves must end up whitespace-normalised to stay
# equal when they're semantically equal. Downstream ``_normalize_model_name``
# also benefits from the strip.
tier_default = (
config.thinking_advanced_model
if tier_name == "advanced"
else config.thinking_standard_model
).strip()
resolved = await _resolve_thinking_model_for_user(tier_name, user_id)
# Subscription mode on standard tier only wins when LD has no opinion
# (value == config default ⇒ admin hasn't explicitly pointed this
# user somewhere). Any LD override — even to the same value with
# stripped whitespace normalised — is an explicit admin choice that
# must be honoured. Without this, a subscription-mode deployment
# silently ignores the ``copilot-thinking-standard-model`` flag
# entirely, which defeats the point of cohort-based routing.
ld_overrides_default = resolved != tier_default
if (
not ld_overrides_default
and tier_name == "standard"
and config.use_claude_code_subscription
):
if model == "advanced":
sdk_model = _normalize_model_name(config.thinking_advanced_model)
logger.info(
"[SDK] [%s] Subscription default (tier=standard, LD unset)",
"[SDK] [%s] Per-request model override: advanced (%s)",
session_id[:12] if session_id else "?",
)
return None
try:
sdk_model = _normalize_model_name(resolved)
except ValueError as exc:
# The per-user LD value didn't pass ``_normalize_model_name``'s
# vendor check (most commonly: a ``moonshotai/kimi-*`` slug on a
# direct-Anthropic deployment that has no OpenRouter route). Fail
# soft to the TIER-SPECIFIC config default — using the generic
# ``_resolve_sdk_model()`` here would pin advanced-tier requests to
# ``thinking_standard_model`` (Sonnet) whenever LD misconfigures
# the advanced cell, silently downgrading the user's chosen tier.
try:
sdk_model = _normalize_model_name(tier_default)
except ValueError:
# Config default is *also* invalid for the active routing
# mode — this is a deployment-level misconfig that the
# ``model_validator`` should catch at startup. Re-raise the
# original LD error so the issue surfaces loudly rather than
# returning something misleading.
raise exc
logger.warning(
"[SDK] [%s] LD model %r rejected for tier=%s (%s); falling "
"back to tier default %s",
session_id[:12] if session_id else "?",
resolved,
tier_name,
exc,
sdk_model,
)
return sdk_model
logger.info(
"[SDK] [%s] Resolved model for tier=%s: %s",
session_id[:12] if session_id else "?",
tier_name,
sdk_model,
)
return sdk_model
if model == "standard":
# Reset to config default — respects subscription mode (None = CLI default).
sdk_model = _resolve_sdk_model()
logger.info(
"[SDK] [%s] Per-request model override: standard (%s)",
session_id[:12] if session_id else "?",
sdk_model or "subscription-default",
)
return sdk_model
return _resolve_sdk_model()
_MAX_TRANSIENT_BACKOFF_SECONDS = 30
@@ -3249,8 +3177,8 @@ async def stream_chat_completion_sdk(
mcp_server = create_copilot_mcp_server(use_e2b=use_e2b)
# Resolve model (request tier → LD per-user override → config default).
sdk_model = await _resolve_sdk_model_for_request(model, session_id, user_id)
# Resolve model (request tier → config default).
sdk_model = await _resolve_sdk_model_for_request(model, session_id)
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
compaction = CompactionTracker()

View File

@@ -4,7 +4,6 @@ import asyncio
import base64
import os
from dataclasses import dataclass
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -18,7 +17,6 @@ from .service import (
_normalize_model_name,
_prepare_file_attachments,
_resolve_sdk_model,
_resolve_sdk_model_for_request,
_safe_close_sdk_client,
)
@@ -518,213 +516,6 @@ class TestResolveSdkModel:
assert _resolve_sdk_model() == "claude-opus-4-6"
class TestResolveSdkModelForRequestLdFallback:
"""``_resolve_sdk_model_for_request`` must fail soft when the LD value
can't be normalised for the active routing mode — flagged as MAJOR by
CodeRabbit + HIGH by Sentry when it was a hard ValueError."""
@pytest.mark.asyncio
async def test_direct_anthropic_mode_rejects_kimi_ld_value_and_falls_back(
self, monkeypatch, _clean_config_env
):
"""LD serves ``moonshotai/kimi-k2.6`` but we're on direct-Anthropic
(no OpenRouter key). ``_normalize_model_name`` raises; the
resolver must log + return the config-default path instead of
500-ing the turn."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6",
claude_agent_model=None,
use_openrouter=False,
api_key=None,
base_url=None,
use_claude_code_subscription=False,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="moonshotai/kimi-k2.6"),
):
resolved = await _resolve_sdk_model_for_request(
model="standard", session_id="sess-abc", user_id="user-1"
)
# Fallback == tier-specific config default (thinking_standard_model
# normalised to hyphen-form for direct-Anthropic mode).
assert resolved == "claude-sonnet-4-6"
@pytest.mark.asyncio
async def test_openrouter_mode_accepts_ld_kimi_value(
self, monkeypatch, _clean_config_env
):
"""On OpenRouter the Kimi slug is legitimate — no fallback,
value returned as-is."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6",
claude_agent_model=None,
use_openrouter=True,
api_key="or-key",
base_url="https://openrouter.ai/api/v1",
use_claude_code_subscription=False,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="moonshotai/kimi-k2.6"),
):
resolved = await _resolve_sdk_model_for_request(
model="standard", session_id="sess-abc", user_id="user-1"
)
assert resolved == "moonshotai/kimi-k2.6"
@pytest.mark.asyncio
async def test_advanced_tier_fallback_uses_advanced_default_not_standard(
self, monkeypatch, _clean_config_env
):
"""An LD-rejected ADVANCED slug must fall back to the advanced
config default (Opus) — not the standard default (Sonnet).
Using ``_resolve_sdk_model()`` as the fallback silently
downgraded the user's chosen tier. Flagged MAJOR by CodeRabbit
+ HIGH by Sentry on the first fail-soft commit."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6",
thinking_advanced_model="anthropic/claude-opus-4.7",
claude_agent_model=None,
use_openrouter=False,
api_key=None,
base_url=None,
use_claude_code_subscription=False,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="moonshotai/kimi-k2.6"),
):
resolved = await _resolve_sdk_model_for_request(
model="advanced", session_id="sess-adv", user_id="user-1"
)
# Direct-Anthropic normalises anthropic/claude-opus-4.7 → claude-opus-4-7
assert resolved == "claude-opus-4-7"
@pytest.mark.asyncio
async def test_standard_ld_override_wins_over_subscription(
self, monkeypatch, _clean_config_env
):
"""Bug reported in local test: subscription mode + LD serving Kimi
on ``copilot-thinking-standard-model`` returned ``None`` (CLI
picked subscription default Opus), silently ignoring the LD
override. An LD value different from the config default is an
explicit admin decision and must win."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6",
claude_agent_model=None,
use_openrouter=True,
api_key="or-key",
base_url="https://openrouter.ai/api/v1",
use_claude_code_subscription=True,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="moonshotai/kimi-k2.6"),
):
resolved = await _resolve_sdk_model_for_request(
model="standard", session_id="sess-std-sub", user_id="user-1"
)
# Expect LD-served Kimi, NOT None (the old subscription-default bypass)
assert resolved == "moonshotai/kimi-k2.6"
@pytest.mark.asyncio
async def test_standard_subscription_survives_trailing_whitespace_in_env(
self, monkeypatch, _clean_config_env
):
"""``_resolve_thinking_model_for_user`` strips whitespace from the LD
side; the config tier default must be stripped too, otherwise a
stray trailing space in ``CHAT_THINKING_STANDARD_MODEL`` makes
``resolved == tier_default`` spuriously False and bypasses
subscription-default mode. Sentry HIGH on L856."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6 ", # trailing spaces
claude_agent_model=None,
use_openrouter=False,
api_key=None,
base_url=None,
use_claude_code_subscription=True,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="anthropic/claude-sonnet-4-6"),
):
resolved = await _resolve_sdk_model_for_request(
model="standard", session_id="sess-ws", user_id="user-1"
)
assert resolved is None, (
"LD value semantically matches the whitespace-padded config "
"default — subscription mode must still win and return None"
)
@pytest.mark.asyncio
async def test_standard_subscription_default_honoured_when_ld_matches_config(
self, monkeypatch, _clean_config_env
):
"""When LD serves the SAME value as the config default (i.e. the
flag is effectively unset / no override), subscription mode still
wins and we return ``None`` so the CLI uses the subscription
default model."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6",
claude_agent_model=None,
use_openrouter=False,
api_key=None,
base_url=None,
use_claude_code_subscription=True,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="anthropic/claude-sonnet-4-6"),
):
resolved = await _resolve_sdk_model_for_request(
model="standard", session_id="sess-std-nop", user_id="user-1"
)
assert resolved is None
@pytest.mark.asyncio
async def test_advanced_tier_consults_ld_under_subscription(
self, monkeypatch, _clean_config_env
):
"""Subscription mode bypasses LD only on the standard tier —
the advanced tier always consults LD because the user explicitly
asked for the premium path. A subscription + advanced request
with LD-served Opus must return Opus (not ``None``)."""
cfg = cfg_mod.ChatConfig(
thinking_standard_model="anthropic/claude-sonnet-4-6",
thinking_advanced_model="anthropic/claude-opus-4.7",
claude_agent_model=None,
use_openrouter=True,
api_key="or-key",
base_url="https://openrouter.ai/api/v1",
use_claude_code_subscription=True,
)
monkeypatch.setattr("backend.copilot.sdk.service.config", cfg)
with patch(
"backend.copilot.sdk.service._resolve_thinking_model_for_user",
new=AsyncMock(return_value="anthropic/claude-opus-4.7"),
):
resolved = await _resolve_sdk_model_for_request(
model="advanced", session_id="sess-adv-sub", user_id="user-1"
)
assert resolved == "anthropic/claude-opus-4.7"
# ---------------------------------------------------------------------------
# _is_sdk_disconnect_error — classify client disconnect cleanup errors
# ---------------------------------------------------------------------------
@@ -915,225 +706,3 @@ class TestIdleTimeoutConstant:
def test_idle_timeout_is_10_min(self):
assert _IDLE_TIMEOUT_SECONDS == 10 * 60
# ---------------------------------------------------------------------------
# _RetryState.observed_model — Moonshot cost-override input
# ---------------------------------------------------------------------------
class TestRetryStateObservedModel:
"""Regression guards for the ``observed_model`` field added to
``_RetryState``. The Moonshot cost override reads this — when a
fallback model activates mid-attempt, the requested primary
(``state.options.model``) no longer matches what actually ran."""
def _make_state(self, *, options_model: str | None = "primary/model"):
"""Build a minimally-valid ``_RetryState``. All the heavy
collaborators are ``MagicMock()`` — the field we care about is
a plain Optional[str], so the surrounding scaffolding just needs
to let the dataclass instantiate."""
from .service import _RetryState, _TokenUsage
options = MagicMock()
options.model = options_model
return _RetryState(
options=options,
query_message="",
was_compacted=False,
use_resume=False,
resume_file=None,
transcript_msg_count=0,
adapter=MagicMock(),
transcript_builder=MagicMock(),
usage=_TokenUsage(),
)
def test_default_is_none(self):
state = self._make_state()
assert state.observed_model is None
def test_assigned_from_assistant_message_model(self):
"""Simulates the population path in ``_run_stream_attempt``:
``observed`` is pulled off the ``AssistantMessage.model`` attr
and assigned onto ``state.observed_model`` when it's a non-empty
string."""
state = self._make_state()
# Simulates the inline assignment the generator does on each
# AssistantMessage — a non-empty string lands on state.
assistant_like = SimpleNamespace(model="anthropic/claude-sonnet-4-6")
observed = getattr(assistant_like, "model", None)
if isinstance(observed, str) and observed:
state.observed_model = observed
assert state.observed_model == "anthropic/claude-sonnet-4-6"
def test_empty_string_model_is_not_assigned(self):
"""Guard against overwriting a real observed value with an
empty-string model (the generator's ``and observed`` check)."""
state = self._make_state()
state.observed_model = "moonshotai/kimi-k2.6" # seeded from a prior msg
assistant_like = SimpleNamespace(model="")
observed = getattr(assistant_like, "model", None)
if isinstance(observed, str) and observed:
state.observed_model = observed
assert state.observed_model == "moonshotai/kimi-k2.6"
def test_missing_model_attr_leaves_observed_untouched(self):
state = self._make_state()
state.observed_model = "moonshotai/kimi-k2.6"
# AssistantMessage may not carry ``.model`` on older SDK rels.
assistant_like = SimpleNamespace() # no ``.model`` attr
observed = getattr(assistant_like, "model", None)
if isinstance(observed, str) and observed:
state.observed_model = observed
assert state.observed_model == "moonshotai/kimi-k2.6"
# ---------------------------------------------------------------------------
# Moonshot cost-override gate — decision logic at the call site
# ---------------------------------------------------------------------------
class TestMoonshotCostOverrideGate:
"""Regression guards for the decision logic in
``_run_stream_attempt`` that picks between the CLI-reported cost
and the Moonshot rate-card override. The code:
active_model = state.observed_model or getattr(state.options, "model", None)
if _is_moonshot_model(active_model):
state.usage.cost_usd = _override_cost_for_moonshot(...)
else:
state.usage.cost_usd = sdk_msg.total_cost_usd
is critical-path billing logic — make sure observed_model wins over
the requested primary, and Anthropic turns pass through untouched."""
def _decide_cost(
self,
*,
observed_model: str | None,
options_model: str | None,
sdk_reported_usd: float,
prompt_tokens: int = 0,
completion_tokens: int = 0,
) -> float:
"""Mirror of the real decision block — lets us assert the gate
without constructing the whole 1000-line generator."""
from .service import _is_moonshot_model, _override_cost_for_moonshot
active_model = observed_model or options_model
if _is_moonshot_model(active_model):
return _override_cost_for_moonshot(
model=active_model,
sdk_reported_usd=sdk_reported_usd,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cache_read_tokens=0,
cache_creation_tokens=0,
)
return sdk_reported_usd
def test_anthropic_turn_passes_sdk_cost_through(self):
"""Anthropic — the CLI's pricing table is authoritative, so
``state.usage.cost_usd`` is set to ``sdk_msg.total_cost_usd``
unchanged."""
cost = self._decide_cost(
observed_model="anthropic/claude-sonnet-4-6",
options_model="anthropic/claude-sonnet-4-6",
sdk_reported_usd=0.123,
)
assert cost == 0.123
def test_moonshot_turn_uses_rate_card_override(self):
"""Moonshot — the CLI would silently bill at Sonnet rates, so
the override recomputes from the Moonshot rate card."""
cost = self._decide_cost(
observed_model="moonshotai/kimi-k2.6",
options_model="moonshotai/kimi-k2.6",
sdk_reported_usd=0.089862, # CLI's Sonnet-priced estimate.
prompt_tokens=29564,
completion_tokens=78,
)
expected = (29564 * 0.60 + 78 * 2.80) / 1_000_000
assert cost == pytest.approx(expected, rel=1e-9)
# Sanity: ~5x cheaper than the CLI's Sonnet-priced number.
assert cost < 0.089862 / 4
def test_observed_model_wins_over_options_primary(self):
"""The whole point of ``observed_model``: a Moonshot-primary
request that fell back to Anthropic must NOT get Moonshot
pricing applied. The gate follows the observed model, not the
requested primary."""
cost = self._decide_cost(
observed_model="anthropic/claude-sonnet-4-6",
options_model="moonshotai/kimi-k2.6", # what we ASKED for
sdk_reported_usd=0.123,
prompt_tokens=1000,
completion_tokens=100,
)
# Observed == Anthropic → CLI-reported cost passes through unchanged.
assert cost == 0.123
def test_anthropic_to_moonshot_fallback_uses_override(self):
"""The inverse: an Anthropic-primary request that fell back to
Moonshot must get the Moonshot override applied — the CLI is
still billing at Sonnet rates for the fallback response."""
cost = self._decide_cost(
observed_model="moonshotai/kimi-k2.6",
options_model="anthropic/claude-sonnet-4-6",
sdk_reported_usd=0.089862,
prompt_tokens=29564,
completion_tokens=78,
)
expected = (29564 * 0.60 + 78 * 2.80) / 1_000_000
assert cost == pytest.approx(expected, rel=1e-9)
def test_no_observed_falls_back_to_options_model(self):
"""First AssistantMessage hasn't arrived yet (or the SDK didn't
emit ``.model``) — the gate falls back to the requested primary."""
cost = self._decide_cost(
observed_model=None,
options_model="moonshotai/kimi-k2.6",
sdk_reported_usd=0.089862,
prompt_tokens=100,
completion_tokens=10,
)
expected = (100 * 0.60 + 10 * 2.80) / 1_000_000
assert cost == pytest.approx(expected, rel=1e-9)
def test_both_none_passes_sdk_cost_through(self):
"""Subscription mode — ``options.model`` may be None and no
AssistantMessage has arrived yet. ``None`` is not a Moonshot
slug so the SDK number lands unchanged."""
cost = self._decide_cost(
observed_model=None,
options_model=None,
sdk_reported_usd=0.05,
)
assert cost == 0.05
# ---------------------------------------------------------------------------
# Moonshot helper re-exports — keep imports stable for call-site code
# ---------------------------------------------------------------------------
class TestMoonshotHelperReexports:
"""``sdk/service.py`` imports the Moonshot helpers under local
aliases (``_is_moonshot_model``, ``_override_cost_for_moonshot``).
Regression guard so a refactor doesn't silently break the import
path the hot-loop code relies on."""
def test_is_moonshot_model_aliased(self):
from backend.copilot.moonshot import is_moonshot_model as canonical
from .service import _is_moonshot_model
assert _is_moonshot_model is canonical
def test_override_cost_for_moonshot_aliased(self):
from backend.copilot.moonshot import override_cost_usd as canonical
from .service import _override_cost_for_moonshot
assert _override_cost_for_moonshot is canonical

View File

@@ -1,541 +0,0 @@
"""Unit tests for title-generation cost tracking helpers.
Covers the new code added in PR #12882:
* ``_title_usage_from_response`` — shape-robust OR ``usage.cost`` extraction
* ``_record_title_generation_cost`` — provider-label + zero-tokens gate
* ``_update_title_async`` — independent title / cost persistence try blocks
* ``_generate_session_title`` — tuple return + robustness against empty choices
Mocks ``persist_and_record_usage`` / ``update_session_title`` at the boundary
where the code under test imports them (``backend.copilot.service.*``).
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from openai.types.chat import ChatCompletion
from openai.types.chat.chat_completion import Choice
from openai.types.chat.chat_completion_message import ChatCompletionMessage
from openai.types.completion_usage import CompletionUsage
from backend.copilot.service import (
_generate_session_title,
_record_title_generation_cost,
_title_usage_from_response,
_update_title_async,
)
def _build_completion(
*,
content: str | None = "Hello Title",
usage: CompletionUsage | None = None,
choices: list[Choice] | None = None,
) -> ChatCompletion:
if choices is None:
msg = ChatCompletionMessage(role="assistant", content=content)
choices = [Choice(index=0, message=msg, finish_reason="stop")]
return ChatCompletion(
id="cmpl-1",
choices=choices,
created=0,
model="anthropic/claude-haiku",
object="chat.completion",
usage=usage,
)
def _usage_with_cost(cost: object | None) -> CompletionUsage:
"""Return a CompletionUsage whose ``model_extra`` carries ``cost``.
Uses ``model_validate`` so OpenRouter's ``cost`` extension lands in
the pydantic ``model_extra`` dict the helper reads from.
"""
payload: dict[str, object] = {
"prompt_tokens": 12,
"completion_tokens": 3,
"total_tokens": 15,
}
if cost is not None:
payload["cost"] = cost
return CompletionUsage.model_validate(payload)
class TestTitleUsageFromResponse:
"""``_title_usage_from_response`` returns sensible zeros/Nones when
optional fields are absent or of unexpected shape."""
def test_usage_none_returns_all_zero(self):
resp = _build_completion(usage=None)
prompt, completion, cost = _title_usage_from_response(resp)
assert prompt == 0
assert completion == 0
assert cost is None
def test_missing_cost_field_returns_none_cost(self):
resp = _build_completion(usage=_usage_with_cost(None))
prompt, completion, cost = _title_usage_from_response(resp)
assert prompt == 12
assert completion == 3
assert cost is None
def test_cost_as_int_is_coerced_to_float(self):
resp = _build_completion(usage=_usage_with_cost(2))
_, _, cost = _title_usage_from_response(resp)
assert isinstance(cost, float)
assert cost == 2.0
def test_cost_as_float_is_returned_as_is(self):
resp = _build_completion(usage=_usage_with_cost(0.000123))
_, _, cost = _title_usage_from_response(resp)
assert cost == pytest.approx(0.000123)
def test_cost_as_non_numeric_string_returns_none(self):
resp = _build_completion(usage=_usage_with_cost("free"))
_, _, cost = _title_usage_from_response(resp)
assert cost is None
def test_empty_model_extra_returns_none_cost(self):
# ``model_extra`` is empty for non-OR routes where pydantic didn't
# receive any extras — prompt/completion still flow through.
usage = CompletionUsage(prompt_tokens=5, completion_tokens=2, total_tokens=7)
resp = _build_completion(usage=usage)
prompt, completion, cost = _title_usage_from_response(resp)
assert (prompt, completion, cost) == (5, 2, None)
def test_zero_prompt_and_completion_tokens(self):
usage = CompletionUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0)
resp = _build_completion(usage=usage)
prompt, completion, cost = _title_usage_from_response(resp)
assert (prompt, completion, cost) == (0, 0, None)
class TestRecordTitleGenerationCost:
"""``_record_title_generation_cost`` persists cost + picks the right
provider label and skips the DB roundtrip when nothing's meaningful
to record."""
@pytest.mark.asyncio
async def test_openrouter_base_url_uses_open_router_provider(self):
resp = _build_completion(usage=_usage_with_cost(0.0002))
persist = AsyncMock(return_value=0)
with (
patch(
"backend.copilot.service.persist_and_record_usage",
new=persist,
),
patch(
"backend.copilot.service.config",
MagicMock(
base_url="https://openrouter.ai/api/v1",
title_model="anthropic/claude-haiku",
),
),
):
await _record_title_generation_cost(
response=resp, user_id="u", session_id="s"
)
persist.assert_awaited_once()
kwargs = persist.await_args.kwargs
assert kwargs["provider"] == "open_router"
assert kwargs["model"] == "anthropic/claude-haiku"
assert kwargs["prompt_tokens"] == 12
assert kwargs["completion_tokens"] == 3
assert kwargs["cost_usd"] == pytest.approx(0.0002)
assert kwargs["log_prefix"] == "[title]"
assert kwargs["session"] is None
@pytest.mark.asyncio
async def test_non_openrouter_base_url_uses_openai_provider(self):
resp = _build_completion(usage=_usage_with_cost(0.0002))
persist = AsyncMock(return_value=0)
with (
patch(
"backend.copilot.service.persist_and_record_usage",
new=persist,
),
patch(
"backend.copilot.service.config",
MagicMock(
base_url="https://api.openai.com/v1",
title_model="gpt-4o-mini",
),
),
):
await _record_title_generation_cost(
response=resp, user_id="u", session_id="s"
)
persist.assert_awaited_once()
assert persist.await_args.kwargs["provider"] == "openai"
@pytest.mark.asyncio
async def test_empty_base_url_uses_openai_provider(self):
resp = _build_completion(usage=_usage_with_cost(0.0001))
persist = AsyncMock(return_value=0)
with (
patch(
"backend.copilot.service.persist_and_record_usage",
new=persist,
),
patch(
"backend.copilot.service.config",
MagicMock(base_url=None, title_model="gpt-4o-mini"),
),
):
await _record_title_generation_cost(
response=resp, user_id=None, session_id=None
)
persist.assert_awaited_once()
assert persist.await_args.kwargs["provider"] == "openai"
@pytest.mark.asyncio
async def test_zero_tokens_zero_cost_skips_persist(self):
"""No cost, no tokens — the early return avoids a worthless
``PlatformCostLog`` row."""
usage = CompletionUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0)
resp = _build_completion(usage=usage)
persist = AsyncMock(return_value=0)
with (
patch(
"backend.copilot.service.persist_and_record_usage",
new=persist,
),
patch(
"backend.copilot.service.config",
MagicMock(
base_url="https://openrouter.ai/api/v1",
title_model="x",
),
),
):
await _record_title_generation_cost(
response=resp, user_id="u", session_id="s"
)
persist.assert_not_awaited()
@pytest.mark.asyncio
async def test_usage_none_skips_persist(self):
"""``usage`` absent on the response == provider didn't report —
still short-circuits to avoid writing a zero-valued row."""
resp = _build_completion(usage=None)
persist = AsyncMock(return_value=0)
with (
patch(
"backend.copilot.service.persist_and_record_usage",
new=persist,
),
patch(
"backend.copilot.service.config",
MagicMock(
base_url="https://openrouter.ai/api/v1",
title_model="x",
),
),
):
await _record_title_generation_cost(
response=resp, user_id="u", session_id="s"
)
persist.assert_not_awaited()
@pytest.mark.asyncio
async def test_tokens_without_cost_still_records(self):
"""Tokens present but ``cost`` missing (non-OR route) still
records a row so token counts are captured — ``cost_usd=None``
is accepted by ``persist_and_record_usage``."""
usage = CompletionUsage(prompt_tokens=8, completion_tokens=2, total_tokens=10)
resp = _build_completion(usage=usage)
persist = AsyncMock(return_value=0)
with (
patch(
"backend.copilot.service.persist_and_record_usage",
new=persist,
),
patch(
"backend.copilot.service.config",
MagicMock(base_url=None, title_model="m"),
),
):
await _record_title_generation_cost(
response=resp, user_id="u", session_id="s"
)
persist.assert_awaited_once()
assert persist.await_args.kwargs["cost_usd"] is None
assert persist.await_args.kwargs["prompt_tokens"] == 8
class TestUpdateTitleAsync:
"""``_update_title_async`` runs title persistence and cost recording
as independent best-effort steps — a failure in one does NOT
cancel the other."""
@pytest.mark.asyncio
async def test_title_success_cost_success(self):
resp = _build_completion(usage=_usage_with_cost(0.0001))
gen = AsyncMock(return_value=("My Title", resp))
update = AsyncMock(return_value=True)
record = AsyncMock()
with (
patch(
"backend.copilot.service._generate_session_title",
new=gen,
),
patch(
"backend.copilot.service.update_session_title",
new=update,
),
patch(
"backend.copilot.service._record_title_generation_cost",
new=record,
),
):
await _update_title_async("sess-1", "hello", user_id="u1")
update.assert_awaited_once_with("sess-1", "u1", "My Title", only_if_empty=True)
record.assert_awaited_once()
assert record.await_args.kwargs["response"] is resp
assert record.await_args.kwargs["user_id"] == "u1"
assert record.await_args.kwargs["session_id"] == "sess-1"
@pytest.mark.asyncio
async def test_title_persist_fails_but_cost_still_recorded(self):
resp = _build_completion(usage=_usage_with_cost(0.0001))
gen = AsyncMock(return_value=("Title", resp))
update = AsyncMock(side_effect=RuntimeError("prisma boom"))
record = AsyncMock()
with (
patch(
"backend.copilot.service._generate_session_title",
new=gen,
),
patch(
"backend.copilot.service.update_session_title",
new=update,
),
patch(
"backend.copilot.service._record_title_generation_cost",
new=record,
),
):
# Must NOT raise — persist failure is swallowed.
await _update_title_async("sess-2", "msg", user_id="u")
update.assert_awaited_once()
record.assert_awaited_once()
@pytest.mark.asyncio
async def test_cost_record_fails_but_title_was_persisted(self):
resp = _build_completion(usage=_usage_with_cost(0.0001))
gen = AsyncMock(return_value=("Title", resp))
update = AsyncMock(return_value=True)
record = AsyncMock(side_effect=RuntimeError("cost record boom"))
with (
patch(
"backend.copilot.service._generate_session_title",
new=gen,
),
patch(
"backend.copilot.service.update_session_title",
new=update,
),
patch(
"backend.copilot.service._record_title_generation_cost",
new=record,
),
):
# Must NOT raise — cost-recording failure is swallowed.
await _update_title_async("sess-3", "msg", user_id="u")
update.assert_awaited_once()
record.assert_awaited_once()
@pytest.mark.asyncio
async def test_no_user_id_skips_title_persist_but_records_cost(self):
"""Anonymous sessions skip the user-scoped title write, but we
still paid for the LLM call — cost recording runs regardless."""
resp = _build_completion(usage=_usage_with_cost(0.0001))
gen = AsyncMock(return_value=("Title", resp))
update = AsyncMock()
record = AsyncMock()
with (
patch(
"backend.copilot.service._generate_session_title",
new=gen,
),
patch(
"backend.copilot.service.update_session_title",
new=update,
),
patch(
"backend.copilot.service._record_title_generation_cost",
new=record,
),
):
await _update_title_async("sess-4", "msg", user_id=None)
update.assert_not_awaited()
record.assert_awaited_once()
@pytest.mark.asyncio
async def test_generation_returns_none_response_skips_cost(self):
"""``_generate_session_title`` swallows exceptions and returns
``(None, None)`` — no response means no cost to record."""
gen = AsyncMock(return_value=(None, None))
update = AsyncMock()
record = AsyncMock()
with (
patch(
"backend.copilot.service._generate_session_title",
new=gen,
),
patch(
"backend.copilot.service.update_session_title",
new=update,
),
patch(
"backend.copilot.service._record_title_generation_cost",
new=record,
),
):
await _update_title_async("sess-5", "msg", user_id="u")
update.assert_not_awaited()
record.assert_not_awaited()
@pytest.mark.asyncio
async def test_empty_title_with_response_still_records_cost(self):
"""Title came back empty but we still paid for the LLM call —
cost recording runs even though the title write is skipped."""
resp = _build_completion(usage=_usage_with_cost(0.0001))
gen = AsyncMock(return_value=(None, resp))
update = AsyncMock()
record = AsyncMock()
with (
patch(
"backend.copilot.service._generate_session_title",
new=gen,
),
patch(
"backend.copilot.service.update_session_title",
new=update,
),
patch(
"backend.copilot.service._record_title_generation_cost",
new=record,
),
):
await _update_title_async("sess-6", "msg", user_id="u")
update.assert_not_awaited()
record.assert_awaited_once()
class TestGenerateSessionTitle:
"""``_generate_session_title`` returns ``(title, response)`` — the
caller owns both the persist and the cost-record decisions."""
@pytest.mark.asyncio
async def test_valid_response_returns_cleaned_title_and_response(self):
# Code strips whitespace, then strips ``"'`` — whitespace inside
# the quotes survives on purpose (titles like ``My Agent`` read
# better than ``MyAgent``). Test keeps the outer quotes + inner
# whitespace distinct so the ordering is pinned.
resp = _build_completion(content='"Clean Me" ')
client = MagicMock()
client.chat.completions.create = AsyncMock(return_value=resp)
with patch(
"backend.copilot.service._get_openai_client",
return_value=client,
):
title, response = await _generate_session_title(
"first message", user_id="u", session_id="s"
)
assert title == "Clean Me"
assert response is resp
@pytest.mark.asyncio
async def test_long_title_truncated_with_ellipsis(self):
"""Titles >50 chars get truncated to 47 + '...'."""
long_title = "A" * 80
resp = _build_completion(content=long_title)
client = MagicMock()
client.chat.completions.create = AsyncMock(return_value=resp)
with patch(
"backend.copilot.service._get_openai_client",
return_value=client,
):
title, _ = await _generate_session_title("x", user_id=None)
assert title is not None
assert len(title) == 50
assert title.endswith("...")
@pytest.mark.asyncio
async def test_empty_choices_returns_none_title_with_response(self):
"""No ``choices`` on the response (shouldn't happen per SDK
typing) must not raise IndexError — response is preserved so the
caller can still record the paid-for cost."""
resp = _build_completion(choices=[])
client = MagicMock()
client.chat.completions.create = AsyncMock(return_value=resp)
with patch(
"backend.copilot.service._get_openai_client",
return_value=client,
):
title, response = await _generate_session_title("x")
assert title is None
assert response is resp
@pytest.mark.asyncio
async def test_missing_message_returns_none_title(self):
"""A choice whose ``.message`` is absent produces a None title
but the response still lands on the caller."""
fake_choice = SimpleNamespace(message=None)
fake_response = SimpleNamespace(choices=[fake_choice])
client = MagicMock()
client.chat.completions.create = AsyncMock(return_value=fake_response)
with patch(
"backend.copilot.service._get_openai_client",
return_value=client,
):
title, response = await _generate_session_title("x")
assert title is None
assert response is fake_response
@pytest.mark.asyncio
async def test_llm_call_raises_returns_none_none(self):
"""Network / API errors on the create call are swallowed;
``(None, None)`` ensures the caller skips both title and cost
without crashing the background task."""
client = MagicMock()
client.chat.completions.create = AsyncMock(
side_effect=RuntimeError("connection reset")
)
with patch(
"backend.copilot.service._get_openai_client",
return_value=client,
):
title, response = await _generate_session_title("x")
assert title is None
assert response is None
@pytest.mark.asyncio
async def test_create_receives_usage_include_extra_body(self):
"""PR adds ``usage: {'include': True}`` so OpenRouter embeds the
real billed cost into the final usage chunk."""
resp = _build_completion(content="Title")
client = MagicMock()
client.chat.completions.create = AsyncMock(return_value=resp)
with patch(
"backend.copilot.service._get_openai_client",
return_value=client,
):
await _generate_session_title(
"hello world", user_id="user-abc", session_id="sess-abc"
)
client.chat.completions.create.assert_awaited_once()
extra_body = client.chat.completions.create.await_args.kwargs["extra_body"]
assert extra_body["usage"] == {"include": True}
assert extra_body["user"] == "user-abc"
assert extra_body["session_id"] == "sess-abc"

View File

@@ -1026,8 +1026,8 @@ async def get_active_session(
# Check if session is stale (running beyond tool timeout + buffer).
# Auto-complete it to prevent infinite polling loops.
# A turn can legitimately run up to COPILOT_CONSUMER_TIMEOUT_SECONDS, so we
# add a 5-minute buffer to avoid false positives during legitimate operations.
# Synchronous tools can run up to COPILOT_CONSUMER_TIMEOUT_SECONDS (1 hour),
# so we add a 5-minute buffer to avoid false positives during legitimate operations.
created_at_str = meta.get("created_at")
if created_at_str:
try:

View File

@@ -11,6 +11,7 @@ from backend.copilot.tracking import track_tool_called
from .add_understanding import AddUnderstandingTool
from .agent_browser import BrowserActTool, BrowserNavigateTool, BrowserScreenshotTool
from .agent_output import AgentOutputTool
from .ask_question import AskQuestionTool
from .base import BaseTool
from .bash_exec import BashExecTool
from .connect_integration import ConnectIntegrationTool
@@ -63,6 +64,7 @@ logger = logging.getLogger(__name__)
# Single source of truth for all tools
TOOL_REGISTRY: dict[str, BaseTool] = {
"add_understanding": AddUnderstandingTool(),
"ask_question": AskQuestionTool(),
"create_agent": CreateAgentTool(),
"customize_agent": CustomizeAgentTool(),
"edit_agent": EditAgentTool(),

View File

@@ -14,21 +14,6 @@ HOST = os.getenv("REDIS_HOST", "localhost")
PORT = int(os.getenv("REDIS_PORT", "6379"))
PASSWORD = os.getenv("REDIS_PASSWORD", None)
# Default socket timeouts so a wedged Redis endpoint can't hang callers
# indefinitely — long-running code paths (cluster_lock refresh in particular)
# rely on these to fail-fast instead of blocking on no-response TCP. Override
# via env if a specific deployment needs a different budget.
#
# 30s matches the convention in ``backend.data.rabbitmq`` and leaves ~6x
# headroom over the largest ``xread(block=5000)`` wait in stream_registry.
# The connect timeout is shorter (5s) because initial connects should be
# fast; a slow connect usually means the endpoint is genuinely unreachable.
SOCKET_TIMEOUT = float(os.getenv("REDIS_SOCKET_TIMEOUT", "30"))
SOCKET_CONNECT_TIMEOUT = float(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", "5"))
# How often redis-py sends a PING on idle connections to detect half-open
# sockets; cheap and avoids waiting for the OS TCP keepalive (~2h default).
HEALTH_CHECK_INTERVAL = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))
logger = logging.getLogger(__name__)
@@ -39,10 +24,6 @@ def connect() -> Redis:
port=PORT,
password=PASSWORD,
decode_responses=True,
socket_timeout=SOCKET_TIMEOUT,
socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
socket_keepalive=True,
health_check_interval=HEALTH_CHECK_INTERVAL,
)
c.ping()
return c
@@ -65,10 +46,6 @@ async def connect_async() -> AsyncRedis:
port=PORT,
password=PASSWORD,
decode_responses=True,
socket_timeout=SOCKET_TIMEOUT,
socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
socket_keepalive=True,
health_check_interval=HEALTH_CHECK_INTERVAL,
)
await c.ping()
return c

View File

@@ -48,16 +48,6 @@ class Flag(str, Enum):
STRIPE_PRICE_BUSINESS = "stripe-price-id-business"
GRAPHITI_MEMORY = "graphiti-memory"
# Copilot model routing — string-valued, returns the model identifier
# (e.g. ``"anthropic/claude-sonnet-4-6"`` or ``"moonshotai/kimi-k2.6"``)
# to use for each cell of the (mode, tier) matrix. Falls back to the
# ``CHAT_*_MODEL`` env/config default when the flag is unset or LD is
# unavailable. Evaluated per user_id so cohorts can be targeted.
COPILOT_FAST_STANDARD_MODEL = "copilot-fast-standard-model"
COPILOT_FAST_ADVANCED_MODEL = "copilot-fast-advanced-model"
COPILOT_THINKING_STANDARD_MODEL = "copilot-thinking-standard-model"
COPILOT_THINKING_ADVANCED_MODEL = "copilot-thinking-advanced-model"
def is_configured() -> bool:
"""Check if LaunchDarkly is configured with an SDK key."""

Binary file not shown.

After

Width:  |  Height:  |  Size: 111 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB