Compare commits

...

30 Commits

Author SHA1 Message Date
Zamil Majdy
e8309856ea test: add E2E screenshots for PR #12737 (post-merge verify) 2026-04-18 23:42:28 +07:00
Zamil Majdy
9f8c2889e4 test: E2E screenshots for PR #12737 2026-04-18 17:30:27 +07:00
Zamil Majdy
189c01ead8 test: refresh E2E screenshots (non-empty) for PR #12737 2026-04-18 14:32:23 +07:00
Zamil Majdy
e86d8322a0 test: E2E screenshots for PR #12737 2026-04-18 14:23:16 +07:00
Zamil Majdy
3ea37947a9 test: add E2E screenshots for PR #12737 2026-04-17 14:55:15 +07:00
Zamil Majdy
f018ad5857 test: add E2E screenshots for PR #12737 2026-04-16 17:38:53 +07:00
majdyz
53b41b1f1d test: add visual proof screenshots for PR #12737 Queue+auto-continue fix 2026-04-15 07:54:58 +07:00
majdyz
54d5706321 test: add E2E screenshots for PR #12737 2026-04-15 00:43:44 +07:00
majdyz
97c4eeab99 test: add E2E screenshots for PR #12737 2026-04-11 08:49:47 +07:00
majdyz
6b390d6677 fix(backend/copilot): apply session_msg_ceiling to no-resume compression fallback
The no-resume fallback in _build_query_message used raw msg_count (> 1) to
detect multi-message history and session.messages[:-1] for the compression
slice. After a turn-start drain appends pending messages, msg_count is inflated
and the fallback fires on what should be a fresh first turn, placing the current
user message into the history context and delivering a confusing split prompt to
the model.

Apply session_msg_ceiling to both branches:
- elif condition: effective_count > 1 instead of msg_count > 1
- compression slice: session.messages[:effective_count - 1] instead of [:-1]

With _pre_drain_msg_count=1 on a first turn with drained pending messages,
effective_count=1 so the fallback is correctly skipped and current_message
(which already contains both the original and pending text) is returned as-is.

Adds regression test covering the spurious-fallback scenario.
2026-04-11 08:45:54 +07:00
majdyz
1d05b06e43 fix(backend/copilot): prevent pending message duplication in stale-transcript gap
When use_resume=True and the transcript is stale, _build_query_message computes
a gap slice from session.messages[transcript_msg_count:-1].  Pending messages
drained at turn start are appended to session.messages AND concatenated into
current_message, so without the ceiling they appear in both gap_context and
current_message.

Capture _pre_drain_msg_count before drain_pending_messages() and pass it as
session_msg_ceiling to _build_query_message.  The gap slice is now bounded at
the pre-drain count, preventing pending messages from leaking into the gap.

Adds two regression tests in query_builder_test.py.
2026-04-11 08:25:14 +07:00
majdyz
c58176365f fix(backend/copilot): use atomic Lua EVAL for pending call-frequency counter
Replace separate INCR + EXPIRE with a single Lua EVAL so the rate-limit
key can never be orphaned without a TTL. If the process died between the
two commands the key would persist indefinitely, permanently locking out
the user after hitting the 30-push limit.

Fixes sentry bug report on routes.py:1153.
2026-04-11 08:01:15 +07:00
majdyz
a7d06854e3 feat(copilot): add per-user call-frequency rate limit to pending endpoint
The token-budget check guards against over-spending but does not prevent
rapid-fire pushes from a client with a large budget.  Add a Redis
INCR + EXPIRE sliding-window counter (30 calls per 60-second window per
user) to cap call frequency independently of token consumption.

Returns HTTP 429 with "Too many pending messages" when exceeded.
Fails open (Redis unavailable → allows request).

Adds test for the new 429 path.

Addresses autogpt-pr-reviewer "Should Fix: per-request rate limit".
2026-04-11 00:42:25 +07:00
majdyz
9bfcdf3f11 test(copilot): add combined-fields test for format_pending_as_user_message
Verify that content + context (url + content) + file_ids all appear in
the formatted output when all fields are present simultaneously.

Addresses autogpt-pr-reviewer 'format_pending_as_user_message never
tested with all fields simultaneously'.
2026-04-11 00:35:27 +07:00
majdyz
18c75beb7a nit(copilot): name pub/sub notify payload constant
Replace magic string "1" in redis.publish() with named constant
_NOTIFY_PAYLOAD for self-documentation.

Addresses autogpt-pr-reviewer nit.
2026-04-11 00:33:49 +07:00
majdyz
9da0dd111f refactor(copilot): extract shared file-ID sanitization helper
Extract `_resolve_workspace_files(user_id, file_ids)` helper from the
duplicated UUID-filter + workspace-DB-lookup logic in both
`stream_chat_post` and `queue_pending_message`.

Both endpoints now call the single helper; callers map the returned
`list[UserWorkspaceFile]` to IDs or file-description strings as before.

Also removes the redundant `if user_id:` guard from `stream_chat_post`'s
file-ID block — `Security(auth.get_user_id)` guarantees a non-empty string.

Addresses autogpt-pr-reviewer "Should Fix: Duplicated file-ID sanitization"
and coderabbitai nit on the if user_id guard.
2026-04-11 00:31:03 +07:00
majdyz
3ef24b3234 refactor(copilot): narrow exception handling and type context field
- Replace broad `except Exception` with `except (json.JSONDecodeError,
  ValidationError, TypeError, ValueError)` in drain_pending_messages so
  unexpected non-data errors propagate instead of being silently swallowed
- Introduce `PendingMessageContext` Pydantic model to replace the raw
  `dict[str, str]` for the context field, making the url/content contract
  explicit and enabling typed attribute access instead of .get() calls
- Update routes.py to construct PendingMessageContext from the validated
  request dict before passing to PendingMessage
- Update tests to use PendingMessageContext directly

Addresses coderabbitai review comments.
2026-04-11 00:27:15 +07:00
majdyz
d10d14ae74 test(copilot): add coverage for pending-message endpoint and URL test
- Add 11 tests for QueuePendingMessageRequest validation and the
  POST /sessions/{id}/messages/pending endpoint covering:
  - 202 happy path
  - 422 on empty/oversized message, context.url > 2KB, context.content > 32KB, >20 file_ids
  - 404 on unknown session
  - 429 on rate limit exceeded
  - file_ids scoped to caller's workspace
- Fix CodeQL false-positive: replace broad url-in-content assertion
  with exact [Page URL: url] substring check in pending_messages_test
2026-04-11 00:10:20 +07:00
majdyz
5e8345e5ee fix(copilot): fix CodeQL false-positive in pending_messages_test
Replace broad `url in content` assertion with exact `[Page URL: url]`
substring check so CodeQL does not flag it as Incomplete URL Substring
Sanitization.
2026-04-11 00:06:24 +07:00
majdyz
a7d97dacf3 fix(copilot): address review comments on pending-messages PR
- Use _pre_drain_msg_count for transcript load gate (len > 1 check)
  to avoid spurious transcript load on first turn with pending messages
- Use _pre_drain_msg_count for Graphiti warm context gate to prevent
  warm context skip when pending messages are drained at first turn
- Add context.url/content length validators to QueuePendingMessageRequest
  to prevent LLM context-window stuffing (2K url, 32K content caps)
- Rename underscore-prefixed active variables (_pm, _content, _pt)
  to conventional names (pm, content, pt) per Python convention
2026-04-11 00:00:07 +07:00
majdyz
39e89b50a7 fix(copilot): address remaining CI failures on pending-messages
1. SDK pyright: the inner ``_fetch_transcript`` closure captured
   ``session`` which pyright couldn't narrow to non-None (the outer
   scope casts it, but the narrowing doesn't propagate into the
   nested async function).  Added an explicit ``assert session is not
   None`` at the top of the closure.
2. Lint: re-formatted ``platform_cost_test.py`` — some pre-existing
   whitespace drift from an upstream merge was tripping Black on CI.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:41:55 +00:00
majdyz
f8f7df7b0a fix(copilot): address CI failures on pending-messages PR
1. SDK retry tests failing with "Event loop is closed" — the
   drain-at-start call in stream_chat_completion_sdk was reaching the
   real ``drain_pending_messages`` (which hits Redis) instead of being
   mocked.  Added a ``drain_pending_messages`` stub returning ``[]`` to
   the shared ``_make_sdk_patches`` helper so all retry-integration
   tests skip the drain path.

2. API types check failing — the new
   ``POST /sessions/{id}/messages/pending`` endpoint wasn't reflected
   in the frontend's ``openapi.json``.  Regenerated via
   ``poetry run export-api-schema --output ../frontend/src/app/api/openapi.json``
   and ``pnpm prettier --write``.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:34:20 +00:00
majdyz
1d0202a882 Merge branch 'feat/copilot-pending-messages' of github.com:Significant-Gravitas/AutoGPT into feat/copilot-pending-messages 2026-04-10 23:29:57 +07:00
majdyz
a4dbcf4247 fix(backend/copilot): address round-3 review — dedup, persist, guards
- Replace maybe_append_user_message with direct session.messages.append
  for pending drain in both baseline mid-loop and SDK drain-at-start:
  pending messages are atomically popped from Redis and are never
  stale-cache duplicates, so the dedup is wrong and causes
  openai_messages/transcript to diverge from the DB record
- Add immediate upsert_chat_session after SDK drain-at-start so a
  crash between drain and finally doesn't lose messages already removed
  from Redis
- Capture _pre_drain_msg_count before the baseline drain-at-start:
  use it for is_first_turn (prevents pending messages from flipping the
  flag to False on an actual first turn) and for _load_prior_transcript
  (prevents the stale-transcript check from firing on every turn that
  drains pending messages, which would block transcript upload forever)
- Remove redundant if user_id: guards in queue_pending_message — user_id
  is guaranteed non-empty by Security(auth.get_user_id); the guards made
  the rate-limit check silently optional
2026-04-10 23:29:44 +07:00
majdyz
51465fbb02 docs(pending_messages): fix two stale comments in pending_messages.py
Round 4 review nits:

- ``_PUSH_LUA`` block comment mentioned "returns 0 from our earlier
  LLEN" which was a leftover from an earlier design that had a
  separate LLEN check. The atomicity guarantee doesn't depend on it.
  Reworded to describe Redis EVAL serialisation instead.
- ``clear_pending_messages`` docstring said "called at the end of a
  turn" but the finally-block call sites were removed in round 2
  when the atomic drain-at-start became the primary consumer. The
  function is now only an operator/debug escape hatch. Docstring
  updated to match.

No behavioural change.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:15:02 +00:00
majdyz
ded048bdfb Merge remote-tracking branch 'origin/dev' into feat/copilot-pending-messages 2026-04-10 23:14:22 +07:00
majdyz
80e580f387 fix(baseline): mirror drained pending messages into transcript_builder
Round 3 follow-up: the drain-at-start in ``stream_chat_completion_baseline``
persisted pending messages to ``session.messages`` but never called
``transcript_builder.append_user`` for them.  A mid-turn transcript
upload would be missing the drained text, which could produce a
malformed assistant-after-assistant structure on the next turn.

The drain block runs BEFORE ``transcript_builder`` is instantiated
(which happens after prompt/transcript async setup), so we can't call
append_user in the drain block itself.  Instead, we remember the
drained list and mirror it into the transcript right after the
single-message ``transcript_builder.append_user(content=message)``
call near the prompt-build site.

Also cleaned up the stray adjacent-string concatenation in the log
line (``"...turn start " "for session %s"`` → single string).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:10:34 +00:00
majdyz
f140e73150 fix(copilot): address round 2 review on pending-messages feature
Critical: SDK path was double-injecting.  The endpoint persisted the
message to ``session.messages`` AND the executor drained it from Redis
and concatenated into ``current_message`` — the LLM saw each queued
message twice (once via the compacted history / gap context that
``_build_query_message`` pulls from ``session.messages``, once via
the new query).  Baseline avoided this via ``maybe_append_user_message``
dedup but SDK had no equivalent guard.

### Fix: Redis is the single source of truth

- Endpoint no longer persists to ``session.messages``.  It only
  pushes to Redis and returns.
- Baseline drain-at-start calls ``maybe_append_user_message`` (dedup
  is a safety net, not the primary guard).
- SDK drain-at-start calls ``maybe_append_user_message`` too, so the
  durable transcript records the queued messages.  The concatenation
  into ``current_message`` stays so the SDK CLI sees the content in
  the first user message of the new turn.

### Baseline max-iterations silent-loss — Fixed

``tool_call_loop`` yields ``finished_naturally=False`` when
``iteration == max_iterations`` then returns.  Previously the drain
only skipped ``finished_naturally=True``, so messages drained on the
max-iterations final yield were appended to ``openai_messages`` and
silently lost (the loop was already exiting).  Now the drain also
skips when ``loop_result.iterations >= _MAX_TOOL_ROUNDS``.

### API response cleanup

- ``QueuePendingMessageResponse``: dropped ``queued`` (always True) and
  ``detail`` (human-readable, clients shouldn't parse).  Kept
  ``buffer_length``, ``max_buffer_length``, and ``turn_in_flight``.

### Tests

- Removed dead ``_FakePipeline`` class (the code switched to Lua EVAL
  in round 1 so the pipeline fake was unused).
- Added ``test_drain_decodes_bytes_payloads`` so the ``bytes → str``
  decode branch in ``drain_pending_messages`` is actually exercised
  (real redis-py returns bytes when ``decode_responses=False``).
- Updated ``_FakeRedis.lists`` type hint to ``list[str | bytes]``.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 15:57:57 +00:00
majdyz
cafe49f295 fix(copilot): address round 1 review on pending-messages feature
Critical fix — the SDK mid-stream injection was structurally broken.
``ClaudeSDKClient.receive_response()`` explicitly returns after the
first ``ResultMessage``, so re-issuing ``client.query()`` and setting
``acc.stream_completed = False`` could never restart the iteration —
the next ``__anext__`` raised ``StopAsyncIteration`` and the injected
turn's response was never consumed.  Replaced the broken mid-stream
path with a turn-start drain that works for both baseline and SDK.

### Changes

**Atomic push via Lua EVAL** (``pending_messages.py``)
- Replace the ``RPUSH`` + ``LTRIM`` + ``EXPIRE`` + ``LLEN`` pipeline
  (which was ``transaction=False`` and racy against concurrent
  ``LPOP``) with a single Lua script so the push is atomic.
- Drop the unused ``enqueued_at`` field.
- Add 16k ``max_length`` cap on ``PendingMessage.content``.

**Baseline path** (``baseline/service.py``)
- Drain at turn start (atomic ``LPOP``): any message queued while the
  session was idle or between turns is picked up before the first
  LLM call.
- Mid-loop drain now skips the final ``tool_call_loop`` yield
  (``finished_naturally=True``) — draining there would append a user
  message the loop is about to exit past, silently losing it.
- Inject via ``format_pending_as_user_message`` so file IDs + context
  are preserved in both ``openai_messages`` and the persisted session
  transcript (previously the DB copy lost file/context metadata).
- Remove the ``finally`` ``clear_pending_messages`` — atomic drain at
  turn start means any late push belongs to the next turn; clearing
  here would racily clobber it.

**SDK path** (``sdk/service.py``)
- Remove the broken mid-stream injection block entirely.
- Drain at turn start (same atomic ``LPOP``) and merge the drained
  messages into ``current_message`` before ``_build_query_message``,
  so the SDK CLI sees them as part of the initial user message.
- Remove the ``finally`` ``clear_pending_messages``.
- Delete the unused ``_combine_pending_messages`` helper.

**Endpoint** (``api/features/chat/routes.py``)
- Enforce ``check_rate_limit`` / ``get_global_rate_limits`` — was
  bypassing per-user daily/weekly token limits that ``/stream``
  enforces.
- ``QueuePendingMessageRequest`` gets ``extra="forbid"`` and
  ``message: max_length=16_000``.
- Push-first, persist-second: if the Redis push fails we raise 5xx;
  previously the session DB got an orphan user message with no
  corresponding queued entry and a retry would duplicate it.
- Log a warning when sanitised file IDs drop unknown entries.
- Persisted message content now uses ``format_pending_as_user_message``
  so the session copy matches what the model actually sees on drain.
- Response returns ``buffer_length``, ``max_buffer_length``, and
  ``turn_in_flight`` so the frontend can show accurate feedback about
  whether the message will hit the current turn or the next one.

**Tests** (``pending_messages_test.py``)
- ``_FakeRedis.eval`` emulates the Lua push script so the existing
  push/drain/cap tests keep working under the new atomic path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 15:37:40 +00:00
majdyz
c6a31cb501 feat(copilot): inject user messages mid-turn via pending buffer
When a user sends a follow-up message while a copilot turn is still
streaming, we now queue it into a per-session Redis buffer and let the
executor currently processing the turn drain it between tool-call
rounds — the model sees the new message before its next LLM call.
Previously such messages were blocked at the RabbitMQ/cluster-lock
layer and only processed after the current turn completed.

### New module
`backend/copilot/pending_messages.py`
- Redis list buffer keyed by ``copilot:pending:{session_id}``
- Pub/sub notify channel as a wake-up hint for future blocking-wait use
- Cap of ``MAX_PENDING_MESSAGES=10`` — trims oldest on overflow
- 1h TTL matches ``stream_ttl`` default
- Helpers: ``push_pending_message``, ``drain_pending_messages``,
  ``peek_pending_count``, ``clear_pending_messages``,
  ``format_pending_as_user_message``

### New endpoint
`POST /sessions/{session_id}/messages/pending`
- Returns 202 + current buffer length
- Persists the message to the DB so it's in the transcript immediately
- Sanitises file IDs against the caller's workspace
- Does NOT start a new turn (unlike ``stream``)

### Baseline path (simple — in-process injection)
`backend/copilot/baseline/service.py`
- Between iterations of ``tool_call_loop``, drain pending and append to
  the shared ``openai_messages`` list so the loop picks them up on the
  next LLM call
- Persist session via ``upsert_chat_session`` after injection
- Finally-block safety net clears the buffer on early exit

### SDK path (in-process injection via live client.query)
`backend/copilot/sdk/service.py`
- When the SDK loop detects ``acc.stream_completed``, before breaking,
  drain pending and send them via the existing open ``client.query()``
  as a new user message; reset ``stream_completed`` to ``False`` and
  ``continue`` the async-for loop so we keep consuming CLI messages
- Combines multiple drained messages into a single ``query()`` call via
  ``_combine_pending_messages`` to preserve ordering
- Finally-block safety net clears the buffer on early exit
- This works because the Claude Agent SDK's ``ClaudeSDKClient`` is a
  long-lived connection: ``query()`` writes a new user message to the
  CLI's stdin and the same ``receive_response()`` stream picks up the
  next turn's events, so we keep session continuity without releasing
  the cluster lock or restarting the subprocess

### Tests
`backend/copilot/pending_messages_test.py`
- FakeRedis + FakePipeline so tests don't need a live Redis
- Covers push/drain, ordering, buffer cap (MAX_PENDING_MESSAGES),
  clear, publish hook, malformed-payload handling, and the format
  helper (plain / with context / with file_ids)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 15:15:52 +00:00
29 changed files with 1427 additions and 39 deletions

View File

@@ -4,7 +4,7 @@ import asyncio
import logging
import re
from collections.abc import AsyncGenerator
from typing import Annotated
from typing import Annotated, Any, cast
from uuid import uuid4
from autogpt_libs import auth
@@ -29,6 +29,12 @@ from backend.copilot.model import (
get_user_sessions,
update_session_title,
)
from backend.copilot.pending_messages import (
MAX_PENDING_MESSAGES,
PendingMessage,
PendingMessageContext,
push_pending_message,
)
from backend.copilot.rate_limit import (
CoPilotUsageStatus,
RateLimitExceeded,
@@ -84,6 +90,27 @@ _UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.I
)
# Call-frequency cap for the pending-message endpoint. The token-budget
# check in queue_pending_message guards against overspend, but does not
# prevent rapid-fire pushes from a client with a large budget. This cap
# (per user, per 60-second window) limits the rate a caller can hammer the
# endpoint independently of token consumption.
_PENDING_CALL_LIMIT = 30 # pushes per minute per user
_PENDING_CALL_WINDOW_SECONDS = 60
_PENDING_CALL_KEY_PREFIX = "copilot:pending:calls:"
# Lua script for atomic INCR + conditional EXPIRE.
# Using a single EVAL ensures the counter never persists without a TTL —
# a bare INCR followed by a separate EXPIRE can leave the key without
# an expiry if the process crashes between the two commands.
_CALL_INCR_LUA = """
local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[1]))
end
return count
"""
async def _validate_and_get_session(
session_id: str,
@@ -96,6 +123,29 @@ async def _validate_and_get_session(
return session
async def _resolve_workspace_files(
user_id: str,
file_ids: list[str],
) -> list[UserWorkspaceFile]:
"""Filter *file_ids* to UUID-valid entries that exist in the caller's workspace.
Returns the matching ``UserWorkspaceFile`` records (empty list if none pass).
Used by both the stream and pending-message endpoints to prevent callers from
referencing other users' files.
"""
valid_ids = [fid for fid in file_ids if _UUID_RE.match(fid)]
if not valid_ids:
return []
workspace = await get_or_create_workspace(user_id)
return await UserWorkspaceFile.prisma().find_many(
where={
"id": {"in": valid_ids},
"workspaceId": workspace.id,
"isDeleted": False,
}
)
router = APIRouter(
tags=["chat"],
)
@@ -119,6 +169,64 @@ class StreamChatRequest(BaseModel):
)
class QueuePendingMessageRequest(BaseModel):
"""Request model for queueing a message into an in-flight turn.
Unlike ``StreamChatRequest`` this endpoint does **not** start a new
turn — the message is appended to a per-session pending buffer that
the executor currently processing the turn will drain between tool
rounds.
"""
model_config = ConfigDict(extra="forbid")
message: str = Field(min_length=1, max_length=16_000)
context: dict[str, str] | None = Field(
default=None,
description="Optional page context: expected keys are 'url' and 'content'.",
)
file_ids: list[str] | None = Field(default=None, max_length=20)
@field_validator("context")
@classmethod
def _validate_context_length(
cls, v: dict[str, str] | None
) -> dict[str, str] | None:
if v is None:
return v
# Cap context values to prevent LLM context-window stuffing via
# large page payloads (url: 2 KB, content: 32 KB).
_URL_LIMIT = 2_000
_CONTENT_LIMIT = 32_000
url = v.get("url", "")
if len(url) > _URL_LIMIT:
raise ValueError(
f"context.url exceeds maximum length of {_URL_LIMIT} characters"
)
content = v.get("content", "")
if len(content) > _CONTENT_LIMIT:
raise ValueError(
f"context.content exceeds maximum length of {_CONTENT_LIMIT} characters"
)
return v
class QueuePendingMessageResponse(BaseModel):
"""Response for the pending-message endpoint.
- ``buffer_length``: how many messages are now in the session's
pending buffer (after this push)
- ``max_buffer_length``: the per-session cap (server-side constant)
- ``turn_in_flight``: ``True`` if a copilot turn was running when
we checked — purely informational for UX feedback. Even when
``False`` the message is still queued: the next turn drains it.
"""
buffer_length: int
max_buffer_length: int
turn_in_flight: bool
class CreateSessionRequest(BaseModel):
"""Request model for creating a new chat session.
@@ -786,33 +894,21 @@ async def stream_chat_post(
# Also sanitise file_ids so only validated, workspace-scoped IDs are
# forwarded downstream (e.g. to the executor via enqueue_copilot_turn).
sanitized_file_ids: list[str] | None = None
if request.file_ids and user_id:
# Filter to valid UUIDs only to prevent DB abuse
valid_ids = [fid for fid in request.file_ids if _UUID_RE.match(fid)]
if valid_ids:
workspace = await get_or_create_workspace(user_id)
# Batch query instead of N+1
files = await UserWorkspaceFile.prisma().find_many(
where={
"id": {"in": valid_ids},
"workspaceId": workspace.id,
"isDeleted": False,
}
if request.file_ids:
files = await _resolve_workspace_files(user_id, request.file_ids)
# Only keep IDs that actually exist in the user's workspace
sanitized_file_ids = [wf.id for wf in files] or None
file_lines: list[str] = [
f"- {wf.name} ({wf.mimeType}, {round(wf.sizeBytes / 1024, 1)} KB), file_id={wf.id}"
for wf in files
]
if file_lines:
files_block = (
"\n\n[Attached files]\n"
+ "\n".join(file_lines)
+ "\nUse read_workspace_file with the file_id to access file contents."
)
# Only keep IDs that actually exist in the user's workspace
sanitized_file_ids = [wf.id for wf in files] or None
file_lines: list[str] = [
f"- {wf.name} ({wf.mimeType}, {round(wf.sizeBytes / 1024, 1)} KB), file_id={wf.id}"
for wf in files
]
if file_lines:
files_block = (
"\n\n[Attached files]\n"
+ "\n".join(file_lines)
+ "\nUse read_workspace_file with the file_id to access file contents."
)
request.message += files_block
request.message += files_block
# Atomically append user message to session BEFORE creating task to avoid
# race condition where GET_SESSION sees task as "running" but message isn't
@@ -1012,6 +1108,129 @@ async def stream_chat_post(
)
@router.post(
"/sessions/{session_id}/messages/pending",
response_model=QueuePendingMessageResponse,
status_code=202,
)
async def queue_pending_message(
session_id: str,
request: QueuePendingMessageRequest,
user_id: str = Security(auth.get_user_id),
):
"""Queue a new user message into an in-flight copilot turn.
When a user sends a follow-up message while a turn is still
streaming, we don't want to block them or start a separate turn —
this endpoint appends the message to a per-session pending buffer.
The executor currently running the turn (baseline path) drains the
buffer between tool-call rounds and appends the message to the
conversation before the next LLM call. On the SDK path the buffer
is drained at the *start* of the next turn (the long-lived
``ClaudeSDKClient.receive_response`` iterator returns after a
``ResultMessage`` so there is no safe point to inject mid-stream
into an existing connection).
Returns 202. Enforces the same per-user daily/weekly token rate
limit as the regular ``/stream`` endpoint so a client can't bypass
it by batching messages through here.
"""
await _validate_and_get_session(session_id, user_id)
# Pre-turn rate-limit check — mirrors stream_chat_post. Without
# this, a client could bypass per-turn token limits by batching
# their extra context through this endpoint while a cheap stream
# is in flight.
# user_id is guaranteed non-empty by Security(auth.get_user_id) — no guard needed.
try:
daily_limit, weekly_limit, _tier = await get_global_rate_limits(
user_id, config.daily_token_limit, config.weekly_token_limit
)
await check_rate_limit(
user_id=user_id,
daily_token_limit=daily_limit,
weekly_token_limit=weekly_limit,
)
except RateLimitExceeded as e:
raise HTTPException(status_code=429, detail=str(e)) from e
# Call-frequency cap: prevent rapid-fire pushes that would bypass the
# token-budget check (which only fires per-turn, not per-push).
# Uses an atomic Lua EVAL (INCR + EXPIRE) so the key can never be
# orphaned without a TTL; fails open if Redis is down.
try:
_redis = await get_redis_async()
_call_key = f"{_PENDING_CALL_KEY_PREFIX}{user_id}"
_call_count = int(
await cast(
"Any",
_redis.eval(
_CALL_INCR_LUA,
1,
_call_key,
str(_PENDING_CALL_WINDOW_SECONDS),
),
)
)
if _call_count > _PENDING_CALL_LIMIT:
raise HTTPException(
status_code=429,
detail=f"Too many pending messages: limit is {_PENDING_CALL_LIMIT} per {_PENDING_CALL_WINDOW_SECONDS}s",
)
except HTTPException:
raise
except Exception:
pass # Redis failure is non-fatal; fail open
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
)
# Sanitise file IDs to the user's own workspace so injection doesn't
# surface other users' files. _resolve_workspace_files handles UUID
# filtering and the workspace-scoped DB lookup.
sanitized_file_ids: list[str] = []
if request.file_ids:
valid_id_count = sum(1 for fid in request.file_ids if _UUID_RE.match(fid))
files = await _resolve_workspace_files(user_id, request.file_ids)
sanitized_file_ids = [wf.id for wf in files]
if len(sanitized_file_ids) != valid_id_count:
logger.warning(
"queue_pending_message: dropped %d file id(s) not in "
"caller's workspace (session=%s)",
valid_id_count - len(sanitized_file_ids),
session_id,
)
# Redis is the single source of truth for pending messages. We do
# NOT persist to ``session.messages`` here — the drain-at-start
# path in the baseline/SDK executor is the sole writer for pending
# content. Persisting both here AND in the drain would cause
# double injection (executor sees the message in ``session.messages``
# *and* drains it from Redis) unless we also dedupe. The dedup in
# ``maybe_append_user_message`` only checks trailing same-role
# repeats, so relying on it is fragile. Keeping the endpoint
# Redis-only avoids the whole consistency-bug class.
pending = PendingMessage(
content=request.message,
file_ids=sanitized_file_ids,
context=PendingMessageContext(**request.context) if request.context else None,
)
buffer_length = await push_pending_message(session_id, pending)
# Check whether a turn is currently running for UX feedback.
active_session = await stream_registry.get_session(session_id)
turn_in_flight = bool(active_session and active_session.status == "running")
return QueuePendingMessageResponse(
buffer_length=buffer_length,
max_buffer_length=MAX_PENDING_MESSAGES,
turn_in_flight=turn_in_flight,
)
@router.get(
"/sessions/{session_id}/stream",
)

View File

@@ -579,3 +579,300 @@ class TestStreamChatRequestModeValidation:
req = StreamChatRequest(message="hi")
assert req.mode is None
# ─── QueuePendingMessageRequest validation ────────────────────────────
class TestQueuePendingMessageRequest:
"""Unit tests for QueuePendingMessageRequest field validation."""
def test_accepts_valid_message(self) -> None:
from backend.api.features.chat.routes import QueuePendingMessageRequest
req = QueuePendingMessageRequest(message="hello")
assert req.message == "hello"
def test_rejects_empty_message(self) -> None:
import pydantic
from backend.api.features.chat.routes import QueuePendingMessageRequest
with pytest.raises(pydantic.ValidationError):
QueuePendingMessageRequest(message="")
def test_rejects_message_over_limit(self) -> None:
import pydantic
from backend.api.features.chat.routes import QueuePendingMessageRequest
with pytest.raises(pydantic.ValidationError):
QueuePendingMessageRequest(message="x" * 16_001)
def test_accepts_valid_context(self) -> None:
from backend.api.features.chat.routes import QueuePendingMessageRequest
req = QueuePendingMessageRequest(
message="hi",
context={"url": "https://example.com", "content": "page text"},
)
assert req.context is not None
assert req.context["url"] == "https://example.com"
def test_rejects_context_url_over_limit(self) -> None:
import pydantic
from backend.api.features.chat.routes import QueuePendingMessageRequest
with pytest.raises(pydantic.ValidationError, match="url"):
QueuePendingMessageRequest(
message="hi",
context={"url": "https://example.com/" + "x" * 2_000},
)
def test_rejects_context_content_over_limit(self) -> None:
import pydantic
from backend.api.features.chat.routes import QueuePendingMessageRequest
with pytest.raises(pydantic.ValidationError, match="content"):
QueuePendingMessageRequest(
message="hi",
context={"content": "x" * 32_001},
)
def test_rejects_extra_fields(self) -> None:
"""extra='forbid' should reject unknown fields."""
import pydantic
from backend.api.features.chat.routes import QueuePendingMessageRequest
with pytest.raises(pydantic.ValidationError):
QueuePendingMessageRequest(message="hi", unknown_field="bad") # type: ignore[call-arg]
def test_accepts_up_to_20_file_ids(self) -> None:
from backend.api.features.chat.routes import QueuePendingMessageRequest
req = QueuePendingMessageRequest(
message="hi",
file_ids=[f"00000000-0000-0000-0000-{i:012d}" for i in range(20)],
)
assert req.file_ids is not None
assert len(req.file_ids) == 20
def test_rejects_more_than_20_file_ids(self) -> None:
import pydantic
from backend.api.features.chat.routes import QueuePendingMessageRequest
with pytest.raises(pydantic.ValidationError):
QueuePendingMessageRequest(
message="hi",
file_ids=[f"00000000-0000-0000-0000-{i:012d}" for i in range(21)],
)
# ─── queue_pending_message endpoint ──────────────────────────────────
def _mock_pending_internals(
mocker: pytest_mock.MockerFixture,
*,
session_exists: bool = True,
call_count: int = 1,
):
"""Mock all async dependencies for the pending-message endpoint."""
if session_exists:
mock_session = mocker.MagicMock()
mock_session.id = "sess-1"
mocker.patch(
"backend.api.features.chat.routes._validate_and_get_session",
new_callable=AsyncMock,
return_value=mock_session,
)
else:
mocker.patch(
"backend.api.features.chat.routes._validate_and_get_session",
side_effect=fastapi.HTTPException(
status_code=404, detail="Session not found."
),
)
mocker.patch(
"backend.api.features.chat.routes.get_global_rate_limits",
new_callable=AsyncMock,
return_value=(0, 0, None),
)
mocker.patch(
"backend.api.features.chat.routes.check_rate_limit",
new_callable=AsyncMock,
return_value=None,
)
# Mock Redis for per-user call-frequency rate limit (atomic Lua EVAL)
mock_redis = mocker.MagicMock()
mock_redis.eval = mocker.AsyncMock(return_value=call_count)
mocker.patch(
"backend.api.features.chat.routes.get_redis_async",
new_callable=AsyncMock,
return_value=mock_redis,
)
mocker.patch(
"backend.api.features.chat.routes.track_user_message",
return_value=None,
)
mocker.patch(
"backend.api.features.chat.routes.push_pending_message",
new_callable=AsyncMock,
return_value=1,
)
mock_registry = mocker.MagicMock()
mock_registry.get_session = mocker.AsyncMock(return_value=None)
mocker.patch(
"backend.api.features.chat.routes.stream_registry",
mock_registry,
)
def test_queue_pending_message_returns_202(mocker: pytest_mock.MockerFixture) -> None:
"""Happy path: valid message returns 202 with buffer_length."""
_mock_pending_internals(mocker)
response = client.post(
"/sessions/sess-1/messages/pending",
json={"message": "follow-up"},
)
assert response.status_code == 202
data = response.json()
assert data["buffer_length"] == 1
assert data["turn_in_flight"] is False
def test_queue_pending_message_empty_body_returns_422() -> None:
"""Empty message must be rejected by Pydantic before hitting any route logic."""
response = client.post(
"/sessions/sess-1/messages/pending",
json={"message": ""},
)
assert response.status_code == 422
def test_queue_pending_message_missing_message_returns_422() -> None:
"""Missing 'message' field returns 422."""
response = client.post(
"/sessions/sess-1/messages/pending",
json={},
)
assert response.status_code == 422
def test_queue_pending_message_session_not_found_returns_404(
mocker: pytest_mock.MockerFixture,
) -> None:
"""If the session doesn't exist or belong to the user, returns 404."""
_mock_pending_internals(mocker, session_exists=False)
response = client.post(
"/sessions/bad-sess/messages/pending",
json={"message": "hi"},
)
assert response.status_code == 404
def test_queue_pending_message_rate_limited_returns_429(
mocker: pytest_mock.MockerFixture,
) -> None:
"""When rate limit is exceeded, endpoint returns 429."""
from backend.copilot.rate_limit import RateLimitExceeded
_mock_pending_internals(mocker)
mocker.patch(
"backend.api.features.chat.routes.check_rate_limit",
side_effect=RateLimitExceeded("daily", datetime.now(UTC) + timedelta(hours=1)),
)
response = client.post(
"/sessions/sess-1/messages/pending",
json={"message": "hi"},
)
assert response.status_code == 429
def test_queue_pending_message_call_frequency_limit_returns_429(
mocker: pytest_mock.MockerFixture,
) -> None:
"""When per-user call frequency limit is exceeded, endpoint returns 429."""
from backend.api.features.chat.routes import _PENDING_CALL_LIMIT
_mock_pending_internals(mocker, call_count=_PENDING_CALL_LIMIT + 1)
response = client.post(
"/sessions/sess-1/messages/pending",
json={"message": "hi"},
)
assert response.status_code == 429
assert "Too many pending messages" in response.json()["detail"]
def test_queue_pending_message_context_url_too_long_returns_422() -> None:
"""context.url over 2 KB is rejected."""
response = client.post(
"/sessions/sess-1/messages/pending",
json={
"message": "hi",
"context": {"url": "https://example.com/" + "x" * 2_000},
},
)
assert response.status_code == 422
def test_queue_pending_message_context_content_too_long_returns_422() -> None:
"""context.content over 32 KB is rejected."""
response = client.post(
"/sessions/sess-1/messages/pending",
json={
"message": "hi",
"context": {"content": "x" * 32_001},
},
)
assert response.status_code == 422
def test_queue_pending_message_too_many_file_ids_returns_422() -> None:
"""More than 20 file_ids should be rejected."""
response = client.post(
"/sessions/sess-1/messages/pending",
json={
"message": "hi",
"file_ids": [f"00000000-0000-0000-0000-{i:012d}" for i in range(21)],
},
)
assert response.status_code == 422
def test_queue_pending_message_file_ids_scoped_to_workspace(
mocker: pytest_mock.MockerFixture,
) -> None:
"""File IDs must be sanitized to the user's workspace before push."""
_mock_pending_internals(mocker)
mocker.patch(
"backend.api.features.chat.routes.get_or_create_workspace",
new_callable=AsyncMock,
return_value=type("W", (), {"id": "ws-1"})(),
)
mock_prisma = mocker.MagicMock()
mock_prisma.find_many = mocker.AsyncMock(return_value=[])
mocker.patch(
"prisma.models.UserWorkspaceFile.prisma",
return_value=mock_prisma,
)
fid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
client.post(
"/sessions/sess-1/messages/pending",
json={"message": "hi", "file_ids": [fid, "not-a-uuid"]},
)
call_kwargs = mock_prisma.find_many.call_args[1]
assert call_kwargs["where"]["id"]["in"] == [fid]
assert call_kwargs["where"]["workspaceId"] == "ws-1"
assert call_kwargs["where"]["isDeleted"] is False

View File

@@ -36,6 +36,10 @@ from backend.copilot.model import (
maybe_append_user_message,
upsert_chat_session,
)
from backend.copilot.pending_messages import (
drain_pending_messages,
format_pending_as_user_message,
)
from backend.copilot.prompting import get_baseline_supplement, get_graphiti_supplement
from backend.copilot.response_model import (
StreamBaseResponse,
@@ -930,6 +934,29 @@ async def stream_chat_completion_baseline(
message_length=len(message or ""),
)
# Capture count *before* the pending drain so is_first_turn and the
# transcript staleness check are not skewed by queued messages.
_pre_drain_msg_count = len(session.messages)
# Drain any messages the user queued via POST /messages/pending
# while this session was idle (or during a previous turn whose
# mid-loop drains missed them). Atomic LPOP guarantees that a
# concurrent push lands *after* the drain and stays queued for the
# next turn instead of being lost.
drained_at_start = await drain_pending_messages(session_id)
if drained_at_start:
logger.info(
"[Baseline] Draining %d pending message(s) at turn start for session %s",
len(drained_at_start),
session_id,
)
for pm in drained_at_start:
content = format_pending_as_user_message(pm)["content"]
# Append directly — pending messages are atomically-popped from
# Redis and are never stale-cache duplicates, so the
# maybe_append_user_message dedup is wrong here.
session.messages.append(ChatMessage(role="user", content=content))
session = await upsert_chat_session(session)
# Select model based on the per-request mode. 'fast' downgrades to
@@ -959,7 +986,9 @@ async def stream_chat_completion_baseline(
# Build system prompt only on the first turn to avoid mid-conversation
# changes from concurrent chats updating business understanding.
is_first_turn = len(session.messages) <= 1
# Use the pre-drain count so queued pending messages don't incorrectly
# flip is_first_turn to False on an actual first turn.
is_first_turn = _pre_drain_msg_count <= 1
# Gate context fetch on both first turn AND user message so that assistant-
# role calls (e.g. tool-result submissions) on the first turn don't trigger
# a needless DB lookup for user understanding.
@@ -970,14 +999,18 @@ async def stream_chat_completion_baseline(
prompt_task = _build_cacheable_system_prompt(None)
# Run download + prompt build concurrently — both are independent I/O
# on the request critical path.
if user_id and len(session.messages) > 1:
# on the request critical path. Use the pre-drain count so pending
# messages drained at turn start don't spuriously trigger a transcript
# load on an actual first turn.
if user_id and _pre_drain_msg_count > 1:
transcript_covers_prefix, (base_system_prompt, understanding) = (
await asyncio.gather(
_load_prior_transcript(
user_id=user_id,
session_id=session_id,
session_msg_count=len(session.messages),
# Use pre-drain count so pending messages don't falsely
# mark the stored transcript as stale and prevent upload.
session_msg_count=_pre_drain_msg_count,
transcript_builder=transcript_builder,
),
prompt_task,
@@ -989,6 +1022,16 @@ async def stream_chat_completion_baseline(
# Append user message to transcript after context injection below so the
# transcript receives the prefixed message when user context is available.
# Mirror any messages drained at turn start (see above) into the
# transcript — otherwise the loaded prior transcript would be
# missing them and a mid-turn upload could leave a malformed
# assistant-after-assistant structure on the next turn.
if drained_at_start:
for pm in drained_at_start:
transcript_builder.append_user(
content=format_pending_as_user_message(pm)["content"]
)
# Generate title for new sessions
if is_user_message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
@@ -1009,8 +1052,10 @@ async def stream_chat_completion_baseline(
graphiti_supplement = get_graphiti_supplement() if graphiti_enabled else ""
system_prompt = base_system_prompt + get_baseline_supplement() + graphiti_supplement
# Warm context: pre-load relevant facts from Graphiti on first turn
if graphiti_enabled and user_id and len(session.messages) <= 1:
# Warm context: pre-load relevant facts from Graphiti on first turn.
# Use the pre-drain count so pending messages drained at turn start
# don't prevent warm context injection on an actual first turn.
if graphiti_enabled and user_id and _pre_drain_msg_count <= 1:
from backend.copilot.graphiti.context import fetch_warm_context
warm_ctx = await fetch_warm_context(user_id, message or "")
@@ -1203,6 +1248,64 @@ async def stream_chat_completion_baseline(
yield evt
state.pending_events.clear()
# Inject any messages the user queued while the turn was
# running. ``tool_call_loop`` mutates ``openai_messages``
# in-place, so appending here means the model sees the new
# messages on its next LLM call.
#
# IMPORTANT: skip when the loop has already finished (no
# more LLM calls are coming). ``tool_call_loop`` yields
# a final ``ToolCallLoopResult`` on both paths:
# - natural finish: ``finished_naturally=True``
# - hit max_iterations: ``finished_naturally=False``
# and ``iterations >= max_iterations``
# In either case the loop is about to return on the next
# ``async for`` step, so draining here would silently
# lose the message (the user sees 202 but the model never
# reads the text). Those messages stay in the buffer and
# get picked up at the start of the next turn.
if loop_result is None:
continue
is_final_yield = (
loop_result.finished_naturally
or loop_result.iterations >= _MAX_TOOL_ROUNDS
)
if is_final_yield:
continue
pending = await drain_pending_messages(session_id)
if pending:
for pm in pending:
# ``format_pending_as_user_message`` embeds file
# attachments and context URL/page content into the
# content string so the in-session transcript is
# a faithful copy of what the model actually saw.
formatted = format_pending_as_user_message(pm)
content_for_db = formatted["content"]
# Append directly — pending messages are atomically-popped
# from Redis and are never stale-cache duplicates, so the
# maybe_append_user_message dedup is wrong here and would
# cause openai_messages/transcript to diverge from session.
session.messages.append(
ChatMessage(role="user", content=content_for_db)
)
openai_messages.append(formatted)
transcript_builder.append_user(content=content_for_db)
try:
await upsert_chat_session(session)
except Exception as persist_err:
logger.warning(
"[Baseline] Failed to persist pending messages for "
"session %s: %s",
session_id,
persist_err,
)
logger.info(
"[Baseline] Injected %d pending message(s) into "
"session %s mid-turn",
len(pending),
session_id,
)
if loop_result and not loop_result.finished_naturally:
limit_msg = (
f"Exceeded {_MAX_TOOL_ROUNDS} tool-call rounds "
@@ -1243,6 +1346,11 @@ async def stream_chat_completion_baseline(
yield StreamError(errorText=error_msg, code="baseline_error")
# Still persist whatever we got
finally:
# Pending messages are drained atomically at turn start and
# between tool rounds, so there's nothing to clear in finally.
# Any message pushed after the final drain window stays in the
# buffer and gets picked up at the start of the next turn.
# Set cost attributes on OTEL span before closing
if _trace_ctx is not None:
try:

View File

@@ -0,0 +1,222 @@
"""Pending-message buffer for in-flight copilot turns.
When a user sends a new message while a copilot turn is already executing,
instead of blocking the frontend (or queueing a brand-new turn after the
current one finishes), we want the new message to be *injected into the
running turn* — appended between tool-call rounds so the model sees it
before its next LLM call.
This module provides the cross-process buffer that makes that possible:
- **Producer** (chat API route): pushes a pending message to Redis and
publishes a notification on a pub/sub channel.
- **Consumer** (executor running the turn): on each tool-call round,
drains the buffer and appends the pending messages to the conversation.
The Redis list is the durable store; the pub/sub channel is a fast
wake-up hint for long-idle consumers (not used by default, but available
for future blocking-wait semantics).
A hard cap of ``MAX_PENDING_MESSAGES`` per session prevents abuse. The
buffer is trimmed to the latest ``MAX_PENDING_MESSAGES`` on every push.
"""
import json
import logging
from typing import Any, cast
from pydantic import BaseModel, Field, ValidationError
from backend.data.redis_client import get_redis_async
logger = logging.getLogger(__name__)
# Per-session cap. Higher values risk a runaway consumer; lower values
# risk dropping user input under heavy typing. 10 was chosen as a
# reasonable ceiling — a user typing faster than the copilot can drain
# between tool rounds is already an unusual usage pattern.
MAX_PENDING_MESSAGES = 10
# Redis key + TTL. The buffer is ephemeral: if a turn completes or the
# executor dies, the pending messages should either have been drained
# already or are safe to drop (the user can resend).
_PENDING_KEY_PREFIX = "copilot:pending:"
_PENDING_CHANNEL_PREFIX = "copilot:pending:notify:"
_PENDING_TTL_SECONDS = 3600 # 1 hour — matches stream_ttl default
# Payload sent on the pub/sub notify channel. Subscribers treat any
# message as a wake-up hint; the value itself is not meaningful.
_NOTIFY_PAYLOAD = "1"
class PendingMessageContext(BaseModel):
"""Structured page context attached to a pending message."""
url: str | None = None
content: str | None = None
class PendingMessage(BaseModel):
"""A user message queued for injection into an in-flight turn."""
content: str = Field(min_length=1, max_length=16_000)
file_ids: list[str] = Field(default_factory=list)
context: PendingMessageContext | None = None
def _buffer_key(session_id: str) -> str:
return f"{_PENDING_KEY_PREFIX}{session_id}"
def _notify_channel(session_id: str) -> str:
return f"{_PENDING_CHANNEL_PREFIX}{session_id}"
# Lua script: push-then-trim-then-expire-then-length, atomically.
# Redis serializes EVAL commands, so a concurrent ``LPOP`` drain
# observes either the pre-push or post-push state of the list — never
# a partial state where the RPUSH has landed but LTRIM hasn't run.
_PUSH_LUA = """
redis.call('RPUSH', KEYS[1], ARGV[1])
redis.call('LTRIM', KEYS[1], -tonumber(ARGV[2]), -1)
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[3]))
return redis.call('LLEN', KEYS[1])
"""
async def push_pending_message(
session_id: str,
message: PendingMessage,
) -> int:
"""Append a pending message to the session's buffer atomically.
Returns the new buffer length. Enforces ``MAX_PENDING_MESSAGES`` by
trimming from the left (oldest) — the newest message always wins if
the user has been typing faster than the copilot can drain.
The push + trim + expire + llen are wrapped in a single Lua EVAL so
concurrent LPOP drains from the executor never observe a partial
state.
"""
redis = await get_redis_async()
key = _buffer_key(session_id)
payload = message.model_dump_json()
new_length = int(
await cast(
"Any",
redis.eval(
_PUSH_LUA,
1,
key,
payload,
str(MAX_PENDING_MESSAGES),
str(_PENDING_TTL_SECONDS),
),
)
)
# Fire-and-forget notify. Subscribers use this as a wake-up hint;
# the buffer itself is authoritative so a lost notify is harmless.
try:
await redis.publish(_notify_channel(session_id), _NOTIFY_PAYLOAD)
except Exception as e: # pragma: no cover
logger.warning("pending_messages: publish failed for %s: %s", session_id, e)
logger.info(
"pending_messages: pushed message to session=%s (buffer_len=%d)",
session_id,
new_length,
)
return new_length
async def drain_pending_messages(session_id: str) -> list[PendingMessage]:
"""Atomically pop all pending messages for *session_id*.
Returns them in enqueue order (oldest first). Uses ``LPOP`` with a
count so the read+delete is a single Redis round trip. If the list
is empty or missing, returns ``[]``.
"""
redis = await get_redis_async()
key = _buffer_key(session_id)
# Redis LPOP with count (Redis 6.2+) returns None for missing key,
# empty list if we somehow race an empty key, or the popped items.
# redis-py's async lpop overload with a count collapses the return
# type in pyright; cast the awaitable so strict type-check stays
# clean without changing runtime behaviour.
lpop_result = await cast(
"Any",
redis.lpop(key, MAX_PENDING_MESSAGES),
)
if not lpop_result:
return []
raw_popped: list[Any] = list(lpop_result)
# redis-py may return bytes or str depending on decode_responses.
decoded: list[str] = [
item.decode("utf-8") if isinstance(item, bytes) else str(item)
for item in raw_popped
]
messages: list[PendingMessage] = []
for payload in decoded:
try:
messages.append(PendingMessage(**json.loads(payload)))
except (json.JSONDecodeError, ValidationError, TypeError, ValueError) as e:
logger.warning(
"pending_messages: dropping malformed entry for %s: %s",
session_id,
e,
)
if messages:
logger.info(
"pending_messages: drained %d messages for session=%s",
len(messages),
session_id,
)
return messages
async def peek_pending_count(session_id: str) -> int:
"""Return the current buffer length without consuming it."""
redis = await get_redis_async()
length = await cast("Any", redis.llen(_buffer_key(session_id)))
return int(length)
async def clear_pending_messages(session_id: str) -> None:
"""Drop the session's pending buffer.
Not called by the normal turn flow — the atomic ``LPOP`` drain at
turn start is the primary consumer, and any push that arrives
after the drain window belongs to the next turn by definition.
Retained as an operator/debug escape hatch for manually clearing a
stuck session and as a fixture in the unit tests.
"""
redis = await get_redis_async()
await redis.delete(_buffer_key(session_id))
def format_pending_as_user_message(message: PendingMessage) -> dict[str, Any]:
"""Shape a ``PendingMessage`` into the OpenAI-format user message dict.
Used by the baseline tool-call loop when injecting the buffered
message into the conversation. Context/file metadata (if any) is
embedded into the content so the model sees everything in one block.
"""
parts: list[str] = [message.content]
if message.context:
if message.context.url:
parts.append(f"\n\n[Page URL: {message.context.url}]")
if message.context.content:
parts.append(f"\n\n[Page content]\n{message.context.content}")
if message.file_ids:
parts.append(
"\n\n[Attached files]\n"
+ "\n".join(f"- file_id={fid}" for fid in message.file_ids)
+ "\nUse read_workspace_file with the file_id to access file contents."
)
return {"role": "user", "content": "".join(parts)}

View File

@@ -0,0 +1,246 @@
"""Tests for the copilot pending-messages buffer.
Uses a fake async Redis client so the tests don't require a real Redis
instance (the backend test suite's DB/Redis fixtures are heavyweight
and pull in the full app startup).
"""
import json
from typing import Any
import pytest
from backend.copilot import pending_messages as pm_module
from backend.copilot.pending_messages import (
MAX_PENDING_MESSAGES,
PendingMessage,
PendingMessageContext,
clear_pending_messages,
drain_pending_messages,
format_pending_as_user_message,
peek_pending_count,
push_pending_message,
)
# ── Fake Redis ──────────────────────────────────────────────────────
class _FakeRedis:
def __init__(self) -> None:
# Values are ``str | bytes`` because real redis-py returns
# bytes when ``decode_responses=False``; the drain path must
# handle both and our tests exercise both.
self.lists: dict[str, list[str | bytes]] = {}
self.published: list[tuple[str, str]] = []
async def eval(self, script: str, num_keys: int, *args: Any) -> Any:
"""Emulate the push Lua script.
The real Lua script runs atomically in Redis; the fake
implementation just runs the equivalent list operations in
order and returns the final LLEN. That's enough to exercise
the cap + ordering invariants the tests care about.
"""
key = args[0]
payload = args[1]
max_len = int(args[2])
# ARGV[3] is TTL — fake doesn't enforce expiry
lst = self.lists.setdefault(key, [])
lst.append(payload)
if len(lst) > max_len:
# RPUSH + LTRIM(-N, -1) = keep only last N
self.lists[key] = lst[-max_len:]
return len(self.lists[key])
async def publish(self, channel: str, payload: str) -> int:
self.published.append((channel, payload))
return 1
async def lpop(self, key: str, count: int) -> list[str | bytes] | None:
lst = self.lists.get(key)
if not lst:
return None
popped = lst[:count]
self.lists[key] = lst[count:]
return popped
async def llen(self, key: str) -> int:
return len(self.lists.get(key, []))
async def delete(self, key: str) -> int:
if key in self.lists:
del self.lists[key]
return 1
return 0
@pytest.fixture()
def fake_redis(monkeypatch: pytest.MonkeyPatch) -> _FakeRedis:
redis = _FakeRedis()
async def _get_redis_async() -> _FakeRedis:
return redis
monkeypatch.setattr(pm_module, "get_redis_async", _get_redis_async)
return redis
# ── Basic push / drain ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_push_and_drain_single_message(fake_redis: _FakeRedis) -> None:
length = await push_pending_message("sess1", PendingMessage(content="hello"))
assert length == 1
assert await peek_pending_count("sess1") == 1
drained = await drain_pending_messages("sess1")
assert len(drained) == 1
assert drained[0].content == "hello"
assert await peek_pending_count("sess1") == 0
@pytest.mark.asyncio
async def test_push_and_drain_preserves_order(fake_redis: _FakeRedis) -> None:
for i in range(3):
await push_pending_message("sess2", PendingMessage(content=f"msg {i}"))
drained = await drain_pending_messages("sess2")
assert [m.content for m in drained] == ["msg 0", "msg 1", "msg 2"]
@pytest.mark.asyncio
async def test_drain_empty_returns_empty_list(fake_redis: _FakeRedis) -> None:
assert await drain_pending_messages("nope") == []
# ── Buffer cap ──────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_cap_drops_oldest_when_exceeded(fake_redis: _FakeRedis) -> None:
# Push MAX_PENDING_MESSAGES + 3 messages
for i in range(MAX_PENDING_MESSAGES + 3):
await push_pending_message("sess3", PendingMessage(content=f"m{i}"))
# Buffer should be clamped to MAX
assert await peek_pending_count("sess3") == MAX_PENDING_MESSAGES
drained = await drain_pending_messages("sess3")
assert len(drained) == MAX_PENDING_MESSAGES
# Oldest 3 dropped — we should only see m3..m(MAX+2)
assert drained[0].content == "m3"
assert drained[-1].content == f"m{MAX_PENDING_MESSAGES + 2}"
# ── Clear ───────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_clear_removes_buffer(fake_redis: _FakeRedis) -> None:
await push_pending_message("sess4", PendingMessage(content="x"))
await push_pending_message("sess4", PendingMessage(content="y"))
await clear_pending_messages("sess4")
assert await peek_pending_count("sess4") == 0
@pytest.mark.asyncio
async def test_clear_is_idempotent(fake_redis: _FakeRedis) -> None:
# Clearing an already-empty buffer should not raise
await clear_pending_messages("sess_empty")
await clear_pending_messages("sess_empty")
# ── Publish hook ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_push_publishes_notification(fake_redis: _FakeRedis) -> None:
await push_pending_message("sess5", PendingMessage(content="hi"))
assert ("copilot:pending:notify:sess5", "1") in fake_redis.published
# ── Format helper ───────────────────────────────────────────────────
def test_format_pending_plain_text() -> None:
msg = PendingMessage(content="just text")
out = format_pending_as_user_message(msg)
assert out == {"role": "user", "content": "just text"}
def test_format_pending_with_context_url() -> None:
msg = PendingMessage(
content="see this page",
context=PendingMessageContext(url="https://example.com"),
)
out = format_pending_as_user_message(msg)
content = out["content"]
assert out["role"] == "user"
assert "see this page" in content
# The URL should appear verbatim in the [Page URL: ...] block.
assert "[Page URL: https://example.com]" in content
def test_format_pending_with_file_ids() -> None:
msg = PendingMessage(content="look here", file_ids=["a", "b"])
out = format_pending_as_user_message(msg)
assert "file_id=a" in out["content"]
assert "file_id=b" in out["content"]
def test_format_pending_with_all_fields() -> None:
"""All fields (content + context url/content + file_ids) should all appear."""
msg = PendingMessage(
content="summarise this",
context=PendingMessageContext(
url="https://example.com/page",
content="headline text",
),
file_ids=["f1", "f2"],
)
out = format_pending_as_user_message(msg)
body = out["content"]
assert out["role"] == "user"
assert "summarise this" in body
assert "[Page URL: https://example.com/page]" in body
assert "[Page content]\nheadline text" in body
assert "file_id=f1" in body
assert "file_id=f2" in body
# ── Malformed payload handling ──────────────────────────────────────
@pytest.mark.asyncio
async def test_drain_skips_malformed_entries(
fake_redis: _FakeRedis,
) -> None:
# Seed the fake with a mix of valid and malformed payloads
fake_redis.lists["copilot:pending:bad"] = [
json.dumps({"content": "valid"}),
"{not valid json",
json.dumps({"content": "also valid", "file_ids": ["a"]}),
]
drained = await drain_pending_messages("bad")
assert len(drained) == 2
assert drained[0].content == "valid"
assert drained[1].content == "also valid"
@pytest.mark.asyncio
async def test_drain_decodes_bytes_payloads(
fake_redis: _FakeRedis,
) -> None:
"""Real redis-py returns ``bytes`` when ``decode_responses=False``.
Seed the fake with bytes values to exercise the ``decode("utf-8")``
branch in ``drain_pending_messages`` so a regression there doesn't
slip past CI.
"""
fake_redis.lists["copilot:pending:bytes_sess"] = [
json.dumps({"content": "from bytes"}).encode("utf-8"),
]
drained = await drain_pending_messages("bytes_sess")
assert len(drained) == 1
assert drained[0].content == "from bytes"

View File

@@ -226,6 +226,111 @@ async def test_build_query_no_resume_multi_message(monkeypatch):
assert was_compacted is False # mock returns False
@pytest.mark.asyncio
async def test_build_query_session_msg_ceiling_prevents_pending_duplication():
"""session_msg_ceiling stops pending messages from leaking into the gap.
Scenario: transcript covers 2 messages, session has 2 historical + 1 current
+ 2 pending drained at turn start. Without the ceiling the gap would include
the pending messages AND current_message already has them → duplication.
With session_msg_ceiling=3 (pre-drain count) the gap slice is empty and
only current_message carries the pending content.
"""
# session.messages after drain: [hist1, hist2, current_msg, pending1, pending2]
session = _make_session(
[
ChatMessage(role="user", content="hist1"),
ChatMessage(role="assistant", content="hist2"),
ChatMessage(role="user", content="current msg with pending1 pending2"),
ChatMessage(role="user", content="pending1"),
ChatMessage(role="user", content="pending2"),
]
)
# transcript covers hist1+hist2 (2 messages); pre-drain count was 3 (includes current_msg)
result, was_compacted = await _build_query_message(
"current msg with pending1 pending2",
session,
use_resume=True,
transcript_msg_count=2,
session_id="test-session",
session_msg_ceiling=3, # len(session.messages) before drain
)
# Gap should be empty (transcript_msg_count == ceiling - 1), so no history prepended
assert result == "current msg with pending1 pending2"
assert was_compacted is False
# Pending messages must NOT appear in gap context
assert "pending1" not in result.split("current msg")[0]
@pytest.mark.asyncio
async def test_build_query_session_msg_ceiling_preserves_real_gap():
"""session_msg_ceiling still surfaces a genuine stale-transcript gap.
Scenario: transcript covers 2 messages, session has 4 historical + 1 current
+ 2 pending. Ceiling = 5 (pre-drain). Real gap = messages 2-3 (hist3, hist4).
"""
session = _make_session(
[
ChatMessage(role="user", content="hist1"),
ChatMessage(role="assistant", content="hist2"),
ChatMessage(role="user", content="hist3"),
ChatMessage(role="assistant", content="hist4"),
ChatMessage(role="user", content="current"),
ChatMessage(role="user", content="pending1"),
ChatMessage(role="user", content="pending2"),
]
)
result, was_compacted = await _build_query_message(
"current",
session,
use_resume=True,
transcript_msg_count=2,
session_id="test-session",
session_msg_ceiling=5, # pre-drain: [hist1..hist4, current]
)
# Gap = session.messages[2:4] = [hist3, hist4]
assert "<conversation_history>" in result
assert "hist3" in result
assert "hist4" in result
assert "Now, the user says:\ncurrent" in result
# Pending messages must NOT appear in gap
assert "pending1" not in result
assert "pending2" not in result
@pytest.mark.asyncio
async def test_build_query_session_msg_ceiling_suppresses_spurious_no_resume_fallback():
"""session_msg_ceiling prevents the no-resume compression fallback from
firing on the first turn of a session when pending messages inflate msg_count.
Scenario: fresh session (1 message) + 1 pending message drained at turn start.
Without the ceiling: msg_count=2 > 1 → fallback triggers → pending message
leaked into history → wrong context sent to model.
With session_msg_ceiling=1 (pre-drain count): effective_count=1, 1 > 1 is False
→ fallback does not trigger → current_message returned as-is.
"""
# session.messages after drain: [current_msg, pending_msg]
session = _make_session(
[
ChatMessage(role="user", content="What is 2 plus 2?"),
ChatMessage(role="user", content="What is 7 plus 7?"), # pending
]
)
result, was_compacted = await _build_query_message(
"What is 2 plus 2?\n\nWhat is 7 plus 7?",
session,
use_resume=False,
transcript_msg_count=0,
session_id="test-session",
session_msg_ceiling=1, # pre-drain: only 1 message existed
)
# Should return current_message directly without wrapping in history context
assert result == "What is 2 plus 2?\n\nWhat is 7 plus 7?"
assert was_compacted is False
# Pending question must NOT appear in a spurious history section
assert "<conversation_history>" not in result
@pytest.mark.asyncio
async def test_build_query_no_resume_multi_message_compacted(monkeypatch):
"""When compression actually compacts, was_compacted should be True."""

View File

@@ -1031,6 +1031,12 @@ def _make_sdk_patches(
),
(f"{_SVC}.upload_transcript", dict(new_callable=AsyncMock)),
(f"{_SVC}.get_user_tier", dict(new_callable=AsyncMock, return_value=None)),
# Stub pending-message drain so retry tests don't hit Redis.
# Returns an empty list → no mid-turn injection happens.
(
f"{_SVC}.drain_pending_messages",
dict(new_callable=AsyncMock, return_value=[]),
),
]

View File

@@ -34,6 +34,10 @@ from opentelemetry import trace as otel_trace
from pydantic import BaseModel
from backend.copilot.context import get_workspace_manager
from backend.copilot.pending_messages import (
drain_pending_messages,
format_pending_as_user_message,
)
from backend.copilot.permissions import apply_tool_permissions
from backend.copilot.rate_limit import get_user_tier
from backend.copilot.transcript import (
@@ -955,17 +959,33 @@ async def _build_query_message(
use_resume: bool,
transcript_msg_count: int,
session_id: str,
*,
session_msg_ceiling: int | None = None,
) -> tuple[str, bool]:
"""Build the query message with appropriate context.
Args:
session_msg_ceiling: If provided, treat ``session.messages`` as if it
only has this many entries when computing the gap slice. Pass
``len(session.messages)`` captured *before* appending any pending
messages so that mid-turn drains do not skew the gap calculation
and cause pending messages to be duplicated in both the gap context
and ``current_message``.
Returns:
Tuple of (query_message, was_compacted).
"""
msg_count = len(session.messages)
# Use the ceiling if supplied (prevents pending-message duplication when
# messages were appended to session.messages after the drain but before
# this function is called).
effective_count = (
session_msg_ceiling if session_msg_ceiling is not None else msg_count
)
if use_resume and transcript_msg_count > 0:
if transcript_msg_count < msg_count - 1:
gap = session.messages[transcript_msg_count:-1]
if transcript_msg_count < effective_count - 1:
gap = session.messages[transcript_msg_count : effective_count - 1]
compressed, was_compressed = await _compress_messages(gap)
gap_context = _format_conversation_context(compressed)
if gap_context:
@@ -981,12 +1001,14 @@ async def _build_query_message(
f"{gap_context}\n\nNow, the user says:\n{current_message}",
was_compressed,
)
elif not use_resume and msg_count > 1:
elif not use_resume and effective_count > 1:
logger.warning(
f"[SDK] Using compression fallback for session "
f"{session_id} ({msg_count} messages) — no transcript for --resume"
f"{session_id} ({effective_count} messages) — no transcript for --resume"
)
compressed, was_compressed = await _compress_messages(
session.messages[: effective_count - 1]
)
compressed, was_compressed = await _compress_messages(session.messages[:-1])
history_context = _format_conversation_context(compressed)
if history_context:
return (
@@ -2042,6 +2064,7 @@ async def stream_chat_completion_sdk(
async def _fetch_transcript():
"""Download transcript for --resume if applicable."""
assert session is not None # narrowed at line 1898
if not (
config.claude_agent_use_resume and user_id and len(session.messages) > 1
):
@@ -2277,6 +2300,61 @@ async def stream_chat_completion_sdk(
if last_user:
current_message = last_user[-1].content or ""
# Capture the message count *before* draining so _build_query_message
# can compute the gap slice without including the newly-drained pending
# messages. Pending messages are both appended to session.messages AND
# concatenated into current_message; without the ceiling the gap slice
# would extend into the pending messages and duplicate them in the
# model's input context (gap_context + current_message both containing
# them).
_pre_drain_msg_count = len(session.messages)
# Drain any messages the user queued via POST /messages/pending
# while the previous turn was running (or since the session was
# idle). Messages are drained ATOMICALLY — one LPOP with count
# removes them all at once, so a concurrent push lands *after*
# the drain and stays queued for the next turn instead of being
# lost between LPOP and clear. File IDs and context are
# preserved via format_pending_as_user_message.
#
# The drained content is concatenated into ``current_message``
# so the SDK CLI sees it in the new user message, AND appended
# directly to ``session.messages`` (no dedup — pending messages are
# atomically-popped from Redis and are never stale-cache duplicates)
# so the durable transcript records it too. Session is persisted
# immediately after the drain so a crash doesn't lose the messages.
# The endpoint deliberately does NOT persist to session.messages —
# Redis is the single source of truth until this drain runs.
pending_at_start = await drain_pending_messages(session_id)
if pending_at_start:
logger.info(
"%s Draining %d pending message(s) at turn start",
log_prefix,
len(pending_at_start),
)
pending_texts: list[str] = [
format_pending_as_user_message(pm)["content"] for pm in pending_at_start
]
for pt in pending_texts:
# Append directly — pending messages are atomically-popped from
# Redis and are never stale-cache duplicates, so the
# maybe_append_user_message dedup is wrong here.
session.messages.append(ChatMessage(role="user", content=pt))
if current_message.strip():
current_message = current_message + "\n\n" + "\n\n".join(pending_texts)
else:
current_message = "\n\n".join(pending_texts)
# Persist immediately so a crash between here and the finally block
# doesn't lose messages that were already drained from Redis.
try:
session = await upsert_chat_session(session)
except Exception as _persist_err:
logger.warning(
"%s Failed to persist drained pending messages: %s",
log_prefix,
_persist_err,
)
if not current_message.strip():
yield StreamError(
errorText="Message cannot be empty.",
@@ -2290,6 +2368,7 @@ async def stream_chat_completion_sdk(
use_resume,
transcript_msg_count,
session_id,
session_msg_ceiling=_pre_drain_msg_count,
)
# On the first turn inject user context into the message instead of the
# system prompt — the system prompt is now static (same for all users)
@@ -2427,6 +2506,7 @@ async def stream_chat_completion_sdk(
state.use_resume,
state.transcript_msg_count,
session_id,
session_msg_ceiling=_pre_drain_msg_count,
)
if attachments.hint:
state.query_message = f"{state.query_message}\n\n{attachments.hint}"
@@ -2756,6 +2836,11 @@ async def stream_chat_completion_sdk(
raise
finally:
# Pending messages are drained atomically at the start of each
# turn (see drain_pending_messages call above), so there's
# nothing to clean up here — any message pushed after that
# point belongs to the next turn.
# --- Close OTEL context (with cost attributes) ---
if _otel_ctx is not None:
try:

View File

@@ -35,7 +35,6 @@ class TestUsdToMicrodollars:
assert usd_to_microdollars(1.0) == 1_000_000
class TestMaskEmail:
def test_typical_email(self):
assert _mask_email("user@example.com") == "us***@example.com"

View File

@@ -1605,6 +1605,56 @@
}
}
},
"/api/chat/sessions/{session_id}/messages/pending": {
"post": {
"tags": ["v2", "chat", "chat"],
"summary": "Queue Pending Message",
"description": "Queue a new user message into an in-flight copilot turn.\n\nWhen a user sends a follow-up message while a turn is still\nstreaming, we don't want to block them or start a separate turn —\nthis endpoint appends the message to a per-session pending buffer.\nThe executor currently running the turn (baseline path) drains the\nbuffer between tool-call rounds and appends the message to the\nconversation before the next LLM call. On the SDK path the buffer\nis drained at the *start* of the next turn (the long-lived\n``ClaudeSDKClient.receive_response`` iterator returns after a\n``ResultMessage`` so there is no safe point to inject mid-stream\ninto an existing connection).\n\nReturns 202. Enforces the same per-user daily/weekly token rate\nlimit as the regular ``/stream`` endpoint so a client can't bypass\nit by batching messages through here.",
"operationId": "postV2QueuePendingMessage",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/QueuePendingMessageRequest"
}
}
}
},
"responses": {
"202": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/QueuePendingMessageResponse"
}
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/chat/sessions/{session_id}/stream": {
"get": {
"tags": ["v2", "chat", "chat"],
@@ -12668,6 +12718,57 @@
"required": ["providers", "pagination"],
"title": "ProviderResponse"
},
"QueuePendingMessageRequest": {
"properties": {
"message": {
"type": "string",
"maxLength": 16000,
"minLength": 1,
"title": "Message"
},
"context": {
"anyOf": [
{
"additionalProperties": { "type": "string" },
"type": "object"
},
{ "type": "null" }
],
"title": "Context",
"description": "Optional page context: expected keys are 'url' and 'content'."
},
"file_ids": {
"anyOf": [
{
"items": { "type": "string" },
"type": "array",
"maxItems": 20
},
{ "type": "null" }
],
"title": "File Ids"
}
},
"additionalProperties": false,
"type": "object",
"required": ["message"],
"title": "QueuePendingMessageRequest",
"description": "Request model for queueing a message into an in-flight turn.\n\nUnlike ``StreamChatRequest`` this endpoint does **not** start a new\nturn — the message is appended to a per-session pending buffer that\nthe executor currently processing the turn will drain between tool\nrounds."
},
"QueuePendingMessageResponse": {
"properties": {
"buffer_length": { "type": "integer", "title": "Buffer Length" },
"max_buffer_length": {
"type": "integer",
"title": "Max Buffer Length"
},
"turn_in_flight": { "type": "boolean", "title": "Turn In Flight" }
},
"type": "object",
"required": ["buffer_length", "max_buffer_length", "turn_in_flight"],
"title": "QueuePendingMessageResponse",
"description": "Response for the pending-message endpoint.\n\n- ``buffer_length``: how many messages are now in the session's\n pending buffer (after this push)\n- ``max_buffer_length``: the per-session cap (server-side constant)\n- ``turn_in_flight``: ``True`` if a copilot turn was running when\n we checked — purely informational for UX feedback. Even when\n ``False`` the message is still queued: the next turn drains it."
},
"RateLimitResetResponse": {
"properties": {
"success": { "type": "boolean", "title": "Success" },

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 77 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 89 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB