Compare commits

...

16 Commits

Author SHA1 Message Date
Zamil Majdy
95c6907ccd fix(frontend): remove test screenshots from repo
Remove binary test screenshots that bloat the repo. Test evidence
should be in the PR description or CI artifacts, not committed.
2026-04-01 18:03:00 +02:00
Zamil Majdy
f4bc3c2012 test: add test screenshots for PR #12598 stream timeout verification 2026-04-01 17:59:17 +02:00
Zamil Majdy
f265ef8ac3 fix(frontend): use type-safe any cast for createSessionMutation call
The generated mutation type differs between local (void) and CI
(requires CreateSessionRequest) due to export-api-schema regeneration.
Use an explicit any cast to handle both generated type variants.
2026-04-01 17:59:17 +02:00
Zamil Majdy
c79e6ff30a fix(frontend): clear stream timeout on stop and fix pre-existing TS errors
Clear the stream timeout timer immediately when the user clicks stop,
preventing a brief window where the timeout could fire after the user
already cancelled the stream. Also fix pre-existing TypeScript errors
in admin rate-limit components (missing user_email on generated type)
and useChatSession (createSessionMutation arg mismatch).
2026-04-01 17:59:17 +02:00
Zamil Majdy
7db8bf161a style(frontend): remove eslint-disable by referencing rawMessages in effect body
Reference rawMessages.length in the stream timeout effect so the
exhaustive-deps rule is satisfied without an eslint suppressor comment.
2026-04-01 17:59:17 +02:00
Zamil Majdy
84650d0f4d fix(frontend): improve stream timeout toast description
Deduplicate "Connection lost" between title and description — the
description now tells the user what to do next.
2026-04-01 17:59:17 +02:00
Zamil Majdy
0467cb2e49 fix(frontend): add stream timeout to copilot chat
When an SSE stream dies silently (no disconnect event), the UI stays
stuck in "Reasoning..." indefinitely. Add a 60-second inactivity
timeout that auto-cancels the stream and shows an error toast,
prompting the user to retry.
2026-04-01 17:59:17 +02:00
Zamil Majdy
24d0c35ed3 fix(backend/copilot): prompt-too-long retry, compaction churn, model-aware compression, and truncated tool call recovery (#12625)
## Why

CoPilot has several context management issues that degrade long
sessions:
1. "Prompt is too long" errors crash the session instead of triggering
retry/compaction
2. Stale thinking blocks bloat transcripts, causing unnecessary
compaction every turn
3. Compression target is hardcoded regardless of model context window
size
4. Truncated tool calls (empty `{}` args from max_tokens) kill the
session instead of guiding the model to self-correct

## What

**Fix 1: Prompt-too-long retry bypass (SENTRY-1207)**
The SDK surfaces "prompt too long" via `AssistantMessage.error` and
`ResultMessage.result` — neither triggered the retry/compaction loop
(only Python exceptions did). Now both paths are intercepted and
re-raised.

**Fix 2: Strip stale thinking blocks before upload**
Thinking/redacted_thinking blocks in non-last assistant entries are
10-50K tokens each but only needed for API signature verification in the
*last* message. Stripping before upload reduces transcript size and
prevents per-turn compaction.

**Fix 3: Model-aware compression target**
`compress_context()` now computes `target_tokens` from the model's
context window (e.g. 140K for Opus 200K) instead of a hardcoded 120K
default. Larger models retain more history; smaller models compress more
aggressively.

**Fix 4: Self-correcting truncated tool calls**
When the model's response exceeds max_tokens, tool call inputs get
silently truncated to `{}`. Previously this tripped a circuit breaker
after 3 attempts. Now the MCP wrapper detects empty args and returns
guidance: "write in chunks with `cat >>`, pass via
`@@agptfile:filename`". The model can self-correct instead of the
session dying.

## How

- **service.py**: `_is_prompt_too_long` checks in both
`AssistantMessage.error` and `ResultMessage` error handlers. Circuit
breaker limit raised from 3→5.
- **transcript.py**: `strip_stale_thinking_blocks()` reverse-scans for
last assistant `message.id`, strips thinking blocks from all others.
Called in `upload_transcript()`.
- **prompt.py**: `get_compression_target(model)` computes
`context_window - 60K overhead`. `compress_context()` uses it when
`target_tokens` is None.
- **tool_adapter.py**: `_truncating` wrapper intercepts empty args on
tools with required params, returns actionable guidance instead of
failing.

## Related

- Fixes SENTRY-1207
- Sessions: `d2f7cba3` (repeated compaction), `08b807d4` (prompt too
long), `130d527c` (truncated tool calls)
- Extends #12413, consolidates #12626

## Test plan

- [x] 6 unit tests for `strip_stale_thinking_blocks`
- [x] 1 integration test for ResultMessage prompt-too-long → compaction
retry
- [x] Pyright clean (0 errors), all pre-commit hooks pass
- [ ] E2E: Load transcripts from affected sessions and verify behavior
2026-04-01 15:10:57 +00:00
Zamil Majdy
8aae7751dc fix(backend/copilot): prevent duplicate block execution from pre-launch arg mismatch (#12632)
## Why

CoPilot sessions are duplicating Linear tickets and GitHub PRs.
Investigation of 5 production sessions (March 31st) found that 3/5
created duplicate Linear issues — each with consecutive IDs at the exact
same timestamp, but only one visible in Langfuse traces.

Production gcloud logs confirm: **279 arg mismatch warnings per day**,
**37 duplicate block execution pairs**, and all LinearCreateIssueBlock
failures in pairs.

Related: SECRT-2204

## What

Replace the speculative pre-launch mechanism with the SDK's native
parallel dispatch via `readOnlyHint` tool annotations. Remove ~580 lines
of pre-launch infrastructure code.

## How

### Root cause
The pre-launch mechanism had three compounding bugs:
1. **Arg mismatch**: The SDK CLI normalises args between the
`AssistantMessage` (used for pre-launch) and the MCP `tools/call`
dispatch, causing frequent mismatches (279/day in prod)
2. **FIFO desync on denial**: Security hooks can deny tool calls,
causing the CLI to skip the MCP dispatch — but the pre-launched task
stays in the FIFO queue, misaligning all subsequent matches
3. **Cancel race**: `task.cancel()` is best-effort in asyncio — if the
HTTP call to Linear/GitHub already completed, the side effect is
irreversible

### Fix
- **Removed** `pre_launch_tool_call()`, `cancel_pending_tool_tasks()`,
`_tool_task_queues` ContextVar, all FIFO queue logic, and all 4
`cancel_pending_tool_tasks()` calls in `service.py`
- **Added** `readOnlyHint=True` annotations on 15+ read-only tools
(`find_block`, `search_docs`, `list_workspace_files`, etc.) — the SDK
CLI natively dispatches these in parallel ([ref:
anthropics/claude-code#14353](https://github.com/anthropics/claude-code/issues/14353))
- Side-effect tools (`run_block`, `bash_exec`, `create_agent`, etc.)
have no annotation → CLI runs them sequentially → no duplicate execution
risk

### Net change: -578 lines, +105 lines
2026-04-01 13:42:54 +00:00
An Vy Le
725da7e887 dx(backend/copilot): clarify ambiguous agent goals using find_block before generation (#12601)
### Why / What / How

**Why:** When a user asks CoPilot to build an agent with an ambiguous
goal (output format, delivery channel, data source, or trigger
unspecified), the agent generator previously made assumptions and jumped
straight into JSON generation. This produced agents that didn't match
what the user actually wanted, requiring multiple correction cycles.

**What:** Adds a "Clarifying Before Building" section to the agent
generation guide. When the goal is ambiguous, CoPilot first calls
`find_block` to discover what the platform actually supports for the
ambiguous dimension, then asks the user one concrete question grounded
in real platform options (e.g. "The platform supports Gmail, Slack, and
Google Docs — which should the agent use for delivery?"). Only after the
user answers does the full agent generation workflow proceed.

**How:** The clarification instruction is added to
`agent_generation_guide.md` — the guide loaded on-demand via
`get_agent_building_guide` when the LLM is about to build an agent. This
avoids polluting the system prompt supplement (which loads for every
CoPilot conversation, not just agent building). No dedicated tool is
needed — the LLM asks naturally in conversation text after discovering
real platform options via `find_block`.

### Changes 🏗️

- `backend/copilot/sdk/agent_generation_guide.md`: Adds "Clarifying
Before Building" section before the workflow steps. Instructs the model
to call `find_block` for the ambiguous dimension, ask the user one
grounded question, wait for the answer, then proceed to generation.
- `backend/copilot/prompting_test.py`: New test file verifying the guide
contains the clarification section and references `find_block`.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [ ] Ask CoPilot to "build an agent to send a report" (ambiguous
output) — verify it calls `find_block` for delivery options and asks one
grounded question before generating JSON
- [ ] Ask CoPilot to "build an agent to scrape prices from Amazon and
email me daily" (specific goal) — verify it skips clarification and
proceeds directly to agent generation
- [ ] Verify the clarification question lists real block options (e.g.
Gmail, Slack, Google Docs) rather than abstract options

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2026-04-01 13:32:12 +00:00
seer-by-sentry[bot]
bd9e9ec614 fix(frontend): remove LaunchDarkly local storage bootstrapping (#12606)
### Why / What / How

<!-- Why: Why does this PR exist? What problem does it solve, or what's
broken/missing without it? -->
This PR fixes
[BUILDER-7HD](https://sentry.io/organizations/significant-gravitas/issues/7374387984/).
The issue was that: LaunchDarkly SDK fails to construct streaming URL
due to non-string `_url` from malformed `localStorage` bootstrap data.
<!-- What: What does this PR change? Summarize the changes at a high
level. -->
Removed the `bootstrap: "localStorage"` option from the LaunchDarkly
provider configuration.
<!-- How: How does it work? Describe the approach, key implementation
details, or architecture decisions. -->
This change ensures that LaunchDarkly no longer attempts to load initial
feature flag values from local storage. Flag values will now always be
fetched directly from the LaunchDarkly service, preventing potential
issues with stale local storage data.

### Changes 🏗️

<!-- List the key changes. Keep it higher level than the diff but
specific enough to highlight what's new/modified. -->
- Removed the `bootstrap: "localStorage"` option from the LaunchDarkly
provider configuration.
- LaunchDarkly will now always fetch flag values directly from its
service, bypassing local storage.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [ ] Verify that LaunchDarkly flags are loaded correctly without
issues.
- [ ] Ensure no errors related to `localStorage` or streaming URL
construction appear in the console.

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:

- [ ] `.env.default` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
Co-authored-by: seer-by-sentry[bot] <157164994+seer-by-sentry[bot]@users.noreply.github.com>
2026-04-01 19:12:54 +07:00
Nicholas Tindle
88589764b5 dx(platform): normalize agent instructions for Claude and Codex (#12592)
### Why / What / How

Why: repo guidance was split between Claude-specific `CLAUDE.md` files
and Codex-specific `AGENTS.md` files, which duplicated instruction
content and made the same repository behave differently across agents.
The repo also had Claude skills under `.claude/skills` but no
Codex-visible repo skill path.

What: this PR bridges the repo's Claude skills into Codex and normalizes
shared instruction files so `AGENTS.md` becomes the canonical source
while each `CLAUDE.md` imports its sibling `AGENTS.md`.

How: add a repo-local `.agents/skills` symlink pointing to
`../.claude/skills`; move nested `CLAUDE.md` content into sibling
`AGENTS.md` files; replace each repo `CLAUDE.md` with a one-line
`@AGENTS.md` shim so Claude and Codex read the same scoped guidance
without duplicating text. The root `CLAUDE.md` now imports the root
`AGENTS.md` rather than symlinking to it.

Note: the instruction-file normalization commit was created with
`--no-verify` because the repo's frontend pre-commit `tsc` hook
currently fails on unrelated existing errors, largely missing
`autogpt_platform/frontend/src/app/api/__generated__/*` modules.

### Changes 🏗️

- Add `.agents/skills` as a repo-local symlink to `../.claude/skills` so
Codex discovers the existing Claude repo skills.
- Add a real root `CLAUDE.md` shim that imports the canonical root
`AGENTS.md`.
- Promote nested scoped instruction content into sibling `AGENTS.md`
files under `autogpt_platform/`, `autogpt_platform/backend/`,
`autogpt_platform/frontend/`, `autogpt_platform/frontend/src/tests/`,
and `docs/`.
- Replace the corresponding nested `CLAUDE.md` files with one-line
`@AGENTS.md` shims.
- Preserve the existing scoped instruction hierarchy while making the
shared content cross-compatible between Claude and Codex.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified `.agents/skills` resolves to `../.claude/skills`
  - [x] Verified each repo `CLAUDE.md` now contains only `@AGENTS.md`
- [x] Verified the expected `AGENTS.md` files exist at the root and
nested scoped directories
- [x] Verified the branch contains only the intended agent-guidance
commits relative to `dev` and the working tree is clean

#### For configuration changes:

- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

No runtime configuration changes are included in this PR.

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Low Risk**
> Low risk: documentation/instruction-file reshuffle plus an
`.agents/skills` pointer; no runtime code paths are modified.
> 
> **Overview**
> Unifies agent guidance so **`AGENTS.md` becomes canonical** and all
corresponding `CLAUDE.md` files become 1-line shims (`@AGENTS.md`) at
the repo root, `autogpt_platform/`, backend, frontend, frontend tests,
and `docs/`.
> 
> Adds `.agents/skills` pointing to `../.claude/skills` so non-Claude
agents discover the same shared skills/instructions, eliminating
duplicated/agent-specific guidance content.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
839483c3b6. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->
2026-04-01 09:08:51 +00:00
Zamil Majdy
c659f3b058 fix(copilot): fix dry-run simulation showing INCOMPLETE/error status (#12580)
## Summary
- **Backend**: Strip empty `error` pins from dry-run simulation outputs
that the simulator always includes (set to `""` meaning "no error").
This was causing the LLM to misinterpret successful simulations as
failures and report "INCOMPLETE" status to users
- **Backend**: Add explicit "Status: COMPLETED" to dry-run response
message to prevent LLM misinterpretation
- **Backend**: Update simulation prompt to exclude `error` from the
"MUST include" keys list, and instruct LLM to omit error unless
simulating a logical failure
- **Frontend**: Fix `isRunBlockErrorOutput()` type guard that was too
broad (`"error" in output` matched BlockOutputResponse objects, not just
ErrorResponse), causing dry-run results to be displayed as errors
- **Frontend**: Fix `parseOutput()` fallback matching to not classify
BlockOutputResponse as ErrorResponse
- **Frontend**: Filter out empty error pins from `BlockOutputCard`
display and accordion metadata output key counting
- **Frontend**: Clear stale execution results before dry-run/no-input
runs so the UI shows fresh output
- **Frontend**: Fix first-click simulate race condition by invalidating
execution details query after WebSocket subscription confirms

## Test plan
- [x] All 12 existing + 5 new dry-run tests pass (`poetry run pytest
backend/copilot/tools/test_dry_run.py -x -v`)
- [x] All 23 helpers tests pass (`poetry run pytest
backend/copilot/tools/helpers_test.py -x -v`)
- [x] All 13 run_block tests pass (`poetry run pytest
backend/copilot/tools/run_block_test.py -x -v`)
- [x] Backend linting passes (ruff check + format)
- [x] Frontend linting passes (next lint)
- [ ] Manual: trigger dry-run on a block with error output pin (e.g.
Komodo Image Generator) — should show "Simulated" status with clean
output, no misleading "error" section
- [ ] Manual: first click on Simulate button should immediately show
results (no race condition)

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
2026-03-31 21:03:00 +00:00
Zamil Majdy
80581a8364 fix(copilot): add tool call circuit breakers and intermediate persistence (#12604)
## Why

CoPilot session `d2f7cba3` took **82 minutes** and cost **$20.66** for a
single user message. Root causes:
1. Redis session meta key expired after 1h, making the session invisible
to the resume endpoint — causing empty page on reload
2. Redis stream key also expired during sub-agent gaps (task_progress
events produced no chunks)
3. No intermediate persistence — session messages only saved to DB after
the entire turn completes
4. Sub-agents retried similar WebSearch queries (addressed via prompt
guidance)

## What

### Redis TTL fixes (root cause of empty session on reload)
- `publish_chunk()` now periodically refreshes **both** the session meta
key AND stream key TTL (every 60s).
- `task_progress` SDK events now emit `StreamHeartbeat` chunks, ensuring
`publish_chunk` is called even during long sub-agent gaps where no real
chunks are produced.
- Without this fix, turns exceeding the 1h `stream_ttl` lose their
"running" status and stream data, making `get_active_session()` return
False.

### Intermediate DB persistence
- Session messages flushed to DB every **30 seconds** or **10 new
messages** during the stream loop.
- Uses `asyncio.shield(upsert_chat_session())` matching the existing
`finally` block pattern.

### Orphaned message cleanup on rollback
- On stream attempt rollback, orphaned messages persisted by
intermediate flushes are now cleaned up from the DB via
`delete_messages_from_sequence`.
- Prevents stale messages from resurfacing on page reload after a failed
retry.

### Prompt guidance
- Added web search best practices to code supplement (search efficiency,
sub-agent scope separation).

### Approach: root cause fixes, not capability limits
- **No tool call caps** — artificial limits on WebSearch or total tool
calls would reduce autopilot capability without addressing why searches
were redundant.
- **Task tool remains enabled** — sub-agent delegation via Task is a
core capability. The existing `max_subtasks` concurrency guard is
sufficient.
- The real fixes (TTL refresh, persistence, prompt guidance) address the
underlying bugs and behavioral issues.

## How

### Files changed
- `stream_registry.py` — Redis meta + stream key TTL refresh in
`publish_chunk()`, module-level keepalive tracker
- `response_adapter.py` — `task_progress` SystemMessage →
StreamHeartbeat emission
- `service.py` — Intermediate DB persistence in `_run_stream_attempt`
stream loop, orphan cleanup on rollback
- `db.py` — `delete_messages_from_sequence` for rollback cleanup
- `prompting.py` — Web search best practices

### GCP log evidence
```
# Meta key expired during 82-min turn:
09:49 — GET_SESSION: active_session=False, msg_count=1  ← meta gone
10:18 — Session persisted in finally with 189 messages   ← turn completed

# T13 (1h45min) same bug reproduced live:
16:20 — task_progress events still arriving, but active_session=False

# Actual cost:
Turn usage: cache_read=347916, cache_create=212472, output=12375, cost_usd=20.66
```

### Test plan
- [x] task_progress emits StreamHeartbeat
- [x] Task background blocked, foreground allowed, slot release on
completion/failure
- [x] CI green (lint, type-check, tests, e2e, CodeQL)

---------

Co-authored-by: Zamil Majdy <majdy.zamil@gmail.com>
2026-03-31 21:01:56 +00:00
lif
3c046eb291 fix(frontend): show all agent outputs instead of only the last one (#12504)
Fixes #9175

### Changes 🏗️

The Agent Outputs panel only displayed the last execution result per
output node, discarding all prior outputs during a run.

**Root cause:** In `AgentOutputs.tsx`, the `outputs` useMemo extracted
only the last element from `nodeExecutionResults`:
```tsx
const latestResult = executionResults[executionResults.length - 1];
```

**Fix:** Changed `.map()` to `.flatMap()` over output nodes, iterating
through all `executionResults` for each node. Each execution result now
gets its own renderer lookup and metadata entry, so the panel shows
every output produced during the run.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified TypeScript compiles without errors
- [x] Confirmed the flatMap logic correctly iterates all execution
results
  - [x] Verified existing filter for null renderers is preserved
- [x] Run an agent with multiple outputs and confirm all show in the
panel

---------

Signed-off-by: majiayu000 <1835304752@qq.com>
Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2026-03-31 20:31:12 +00:00
Zamil Majdy
3e25488b2d feat(copilot): add session-level dry_run flag to autopilot sessions (#12582)
## Summary
- Adds a session-level `dry_run` flag that forces ALL tool calls
(`run_block`, `run_agent`) in a copilot/autopilot session to use dry-run
simulation mode
- Stores the flag in a typed `ChatSessionMetadata` JSON model on the
`ChatSession` DB row, accessed via `session.dry_run` property
- Adds `dry_run` to the AutoPilot block Input schema so graph builders
can create dry-run autopilot nodes
- Refactors multiple copilot tools from `**kwargs` to explicit
parameters for type safety

## Changes
- **Prisma schema**: Added `metadata` JSON column to `ChatSession` model
with migration
- **Python models**: Added `ChatSessionMetadata` model with `dry_run`
field, added `metadata` field to `ChatSessionInfo` and `ChatSession`,
updated `from_db()`, `new()`, and `create_chat_session()`
- **Session propagation**: `set_execution_context(user_id, session)`
called from `baseline/service.py` so tool handlers can read
session-level flags via `session.dry_run`
- **Tool enforcement**: `run_block` and `run_agent` check
`session.dry_run` and force `dry_run=True` when set; `run_agent` blocks
scheduling in dry-run sessions
- **AutoPilot block**: Added `dry_run` input field, passes it when
creating sessions
- **Chat API**: Added `CreateSessionRequest` model with `dry_run` field
to `POST /sessions` endpoint; added `metadata` to session responses
- **Frontend**: Updated `useChatSession.ts` to pass body to the create
session mutation
- **Tool refactoring**: Multiple copilot tools refactored from
`**kwargs` to explicit named parameters (agent_browser, manage_folders,
workspace_files, connect_integration, agent_output, bash_exec, etc.) for
better type safety

## Test plan
- [x] Unit tests for `ChatSession.new()` with dry_run parameter
- [x] Unit tests for `RunBlockTool` session dry_run override
- [x] Unit tests for `RunAgentTool` session dry_run override
- [x] Unit tests for session dry_run blocks scheduling
- [x] Existing dry_run tests still pass (12/12)
- [x] Existing permissions tests still pass
- [x] All pre-commit hooks pass (ruff, isort, pyright, tsc)
- [ ] Manual: Create autopilot session with `dry_run=True`, verify
run_block/run_agent calls use simulation

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 16:27:36 +00:00
93 changed files with 3370 additions and 1683 deletions

1
.agents/skills Symbolic link
View File

@@ -0,0 +1 @@
../.claude/skills

View File

@@ -1,6 +1,6 @@
# AutoGPT Platform Contribution Guide
This guide provides context for Codex when updating the **autogpt_platform** folder.
This guide provides context for coding agents when updating the **autogpt_platform** folder.
## Directory overview

1
CLAUDE.md Normal file
View File

@@ -0,0 +1 @@
@AGENTS.md

120
autogpt_platform/AGENTS.md Normal file
View File

@@ -0,0 +1,120 @@
# AutoGPT Platform
This file provides guidance to coding agents when working with code in this repository.
## Repository Overview
AutoGPT Platform is a monorepo containing:
- **Backend** (`backend`): Python FastAPI server with async support
- **Frontend** (`frontend`): Next.js React application
- **Shared Libraries** (`autogpt_libs`): Common Python utilities
## Component Documentation
- **Backend**: See @backend/AGENTS.md for backend-specific commands, architecture, and development tasks
- **Frontend**: See @frontend/AGENTS.md for frontend-specific commands, architecture, and development patterns
## Key Concepts
1. **Agent Graphs**: Workflow definitions stored as JSON, executed by the backend
2. **Blocks**: Reusable components in `backend/backend/blocks/` that perform specific tasks
3. **Integrations**: OAuth and API connections stored per user
4. **Store**: Marketplace for sharing agent templates
5. **Virus Scanning**: ClamAV integration for file upload security
### Environment Configuration
#### Configuration Files
- **Backend**: `backend/.env.default` (defaults) → `backend/.env` (user overrides)
- **Frontend**: `frontend/.env.default` (defaults) → `frontend/.env` (user overrides)
- **Platform**: `.env.default` (Supabase/shared defaults) → `.env` (user overrides)
#### Docker Environment Loading Order
1. `.env.default` files provide base configuration (tracked in git)
2. `.env` files provide user-specific overrides (gitignored)
3. Docker Compose `environment:` sections provide service-specific overrides
4. Shell environment variables have highest precedence
#### Key Points
- All services use hardcoded defaults in docker-compose files (no `${VARIABLE}` substitutions)
- The `env_file` directive loads variables INTO containers at runtime
- Backend/Frontend services use YAML anchors for consistent configuration
- Supabase services (`db/docker/docker-compose.yml`) follow the same pattern
### Branching Strategy
- **`dev`** is the main development branch. All PRs should target `dev`.
- **`master`** is the production branch. Only used for production releases.
### Creating Pull Requests
- Create the PR against the `dev` branch of the repository.
- **Split PRs by concern** — each PR should have a single clear purpose. For example, "usage tracking" and "credit charging" should be separate PRs even if related. Combining multiple concerns makes it harder for reviewers to understand what belongs to what.
- Ensure the branch name is descriptive (e.g., `feature/add-new-block`)
- Use conventional commit messages (see below)
- **Structure the PR description with Why / What / How** — Why: the motivation (what problem it solves, what's broken/missing without it); What: high-level summary of changes; How: approach, key implementation details, or architecture decisions. Reviewers need all three to judge whether the approach fits the problem.
- Fill out the .github/PULL_REQUEST_TEMPLATE.md template as the PR description
- Always use `--body-file` to pass PR body — avoids shell interpretation of backticks and special characters:
```bash
PR_BODY=$(mktemp)
cat > "$PR_BODY" << 'PREOF'
## Summary
- use `backticks` freely here
PREOF
gh pr create --title "..." --body-file "$PR_BODY" --base dev
rm "$PR_BODY"
```
- Run the github pre-commit hooks to ensure code quality.
### Test-Driven Development (TDD)
When fixing a bug or adding a feature, follow a test-first approach:
1. **Write a failing test first** — create a test that reproduces the bug or validates the new behavior, marked with `@pytest.mark.xfail` (backend) or `.fixme` (Playwright). Run it to confirm it fails for the right reason.
2. **Implement the fix/feature** — write the minimal code to make the test pass.
3. **Remove the xfail marker** — once the test passes, remove the `xfail`/`.fixme` annotation and run the full test suite to confirm nothing else broke.
This ensures every change is covered by a test and that the test actually validates the intended behavior.
### Reviewing/Revising Pull Requests
Use `/pr-review` to review a PR or `/pr-address` to address comments.
When fetching comments manually:
- `gh api repos/Significant-Gravitas/AutoGPT/pulls/{N}/reviews --paginate` — top-level reviews
- `gh api repos/Significant-Gravitas/AutoGPT/pulls/{N}/comments --paginate` — inline review comments (always paginate to avoid missing comments beyond page 1)
- `gh api repos/Significant-Gravitas/AutoGPT/issues/{N}/comments` — PR conversation comments
### Conventional Commits
Use this format for commit messages and Pull Request titles:
**Conventional Commit Types:**
- `feat`: Introduces a new feature to the codebase
- `fix`: Patches a bug in the codebase
- `refactor`: Code change that neither fixes a bug nor adds a feature; also applies to removing features
- `ci`: Changes to CI configuration
- `docs`: Documentation-only changes
- `dx`: Improvements to the developer experience
**Recommended Base Scopes:**
- `platform`: Changes affecting both frontend and backend
- `frontend`
- `backend`
- `infra`
- `blocks`: Modifications/additions of individual blocks
**Subscope Examples:**
- `backend/executor`
- `backend/db`
- `frontend/builder` (includes changes to the block UI component)
- `infra/prod`
Use these scopes and subscopes for clarity and consistency in commit messages.

View File

@@ -1,120 +1 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Repository Overview
AutoGPT Platform is a monorepo containing:
- **Backend** (`backend`): Python FastAPI server with async support
- **Frontend** (`frontend`): Next.js React application
- **Shared Libraries** (`autogpt_libs`): Common Python utilities
## Component Documentation
- **Backend**: See @backend/CLAUDE.md for backend-specific commands, architecture, and development tasks
- **Frontend**: See @frontend/CLAUDE.md for frontend-specific commands, architecture, and development patterns
## Key Concepts
1. **Agent Graphs**: Workflow definitions stored as JSON, executed by the backend
2. **Blocks**: Reusable components in `backend/backend/blocks/` that perform specific tasks
3. **Integrations**: OAuth and API connections stored per user
4. **Store**: Marketplace for sharing agent templates
5. **Virus Scanning**: ClamAV integration for file upload security
### Environment Configuration
#### Configuration Files
- **Backend**: `backend/.env.default` (defaults) → `backend/.env` (user overrides)
- **Frontend**: `frontend/.env.default` (defaults) → `frontend/.env` (user overrides)
- **Platform**: `.env.default` (Supabase/shared defaults) → `.env` (user overrides)
#### Docker Environment Loading Order
1. `.env.default` files provide base configuration (tracked in git)
2. `.env` files provide user-specific overrides (gitignored)
3. Docker Compose `environment:` sections provide service-specific overrides
4. Shell environment variables have highest precedence
#### Key Points
- All services use hardcoded defaults in docker-compose files (no `${VARIABLE}` substitutions)
- The `env_file` directive loads variables INTO containers at runtime
- Backend/Frontend services use YAML anchors for consistent configuration
- Supabase services (`db/docker/docker-compose.yml`) follow the same pattern
### Branching Strategy
- **`dev`** is the main development branch. All PRs should target `dev`.
- **`master`** is the production branch. Only used for production releases.
### Creating Pull Requests
- Create the PR against the `dev` branch of the repository.
- **Split PRs by concern** — each PR should have a single clear purpose. For example, "usage tracking" and "credit charging" should be separate PRs even if related. Combining multiple concerns makes it harder for reviewers to understand what belongs to what.
- Ensure the branch name is descriptive (e.g., `feature/add-new-block`)
- Use conventional commit messages (see below)
- **Structure the PR description with Why / What / How** — Why: the motivation (what problem it solves, what's broken/missing without it); What: high-level summary of changes; How: approach, key implementation details, or architecture decisions. Reviewers need all three to judge whether the approach fits the problem.
- Fill out the .github/PULL_REQUEST_TEMPLATE.md template as the PR description
- Always use `--body-file` to pass PR body — avoids shell interpretation of backticks and special characters:
```bash
PR_BODY=$(mktemp)
cat > "$PR_BODY" << 'PREOF'
## Summary
- use `backticks` freely here
PREOF
gh pr create --title "..." --body-file "$PR_BODY" --base dev
rm "$PR_BODY"
```
- Run the github pre-commit hooks to ensure code quality.
### Test-Driven Development (TDD)
When fixing a bug or adding a feature, follow a test-first approach:
1. **Write a failing test first** — create a test that reproduces the bug or validates the new behavior, marked with `@pytest.mark.xfail` (backend) or `.fixme` (Playwright). Run it to confirm it fails for the right reason.
2. **Implement the fix/feature** — write the minimal code to make the test pass.
3. **Remove the xfail marker** — once the test passes, remove the `xfail`/`.fixme` annotation and run the full test suite to confirm nothing else broke.
This ensures every change is covered by a test and that the test actually validates the intended behavior.
### Reviewing/Revising Pull Requests
Use `/pr-review` to review a PR or `/pr-address` to address comments.
When fetching comments manually:
- `gh api repos/Significant-Gravitas/AutoGPT/pulls/{N}/reviews --paginate` — top-level reviews
- `gh api repos/Significant-Gravitas/AutoGPT/pulls/{N}/comments --paginate` — inline review comments (always paginate to avoid missing comments beyond page 1)
- `gh api repos/Significant-Gravitas/AutoGPT/issues/{N}/comments` — PR conversation comments
### Conventional Commits
Use this format for commit messages and Pull Request titles:
**Conventional Commit Types:**
- `feat`: Introduces a new feature to the codebase
- `fix`: Patches a bug in the codebase
- `refactor`: Code change that neither fixes a bug nor adds a feature; also applies to removing features
- `ci`: Changes to CI configuration
- `docs`: Documentation-only changes
- `dx`: Improvements to the developer experience
**Recommended Base Scopes:**
- `platform`: Changes affecting both frontend and backend
- `frontend`
- `backend`
- `infra`
- `blocks`: Modifications/additions of individual blocks
**Subscope Examples:**
- `backend/executor`
- `backend/db`
- `frontend/builder` (includes changes to the block UI component)
- `infra/prod`
Use these scopes and subscopes for clarity and consistency in commit messages.
@AGENTS.md

View File

@@ -0,0 +1,227 @@
# Backend
This file provides guidance to coding agents when working with the backend.
## Essential Commands
To run something with Python package dependencies you MUST use `poetry run ...`.
```bash
# Install dependencies
poetry install
# Run database migrations
poetry run prisma migrate dev
# Start all services (database, redis, rabbitmq, clamav)
docker compose up -d
# Run the backend as a whole
poetry run app
# Run tests
poetry run test
# Run specific test
poetry run pytest path/to/test_file.py::test_function_name
# Run block tests (tests that validate all blocks work correctly)
poetry run pytest backend/blocks/test/test_block.py -xvs
# Run tests for a specific block (e.g., GetCurrentTimeBlock)
poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[GetCurrentTimeBlock]' -xvs
# Lint and format
# prefer format if you want to just "fix" it and only get the errors that can't be autofixed
poetry run format # Black + isort
poetry run lint # ruff
```
More details can be found in @TESTING.md
### Creating/Updating Snapshots
When you first write a test or when the expected output changes:
```bash
poetry run pytest path/to/test.py --snapshot-update
```
⚠️ **Important**: Always review snapshot changes before committing! Use `git diff` to verify the changes are expected.
## Architecture
- **API Layer**: FastAPI with REST and WebSocket endpoints
- **Database**: PostgreSQL with Prisma ORM, includes pgvector for embeddings
- **Queue System**: RabbitMQ for async task processing
- **Execution Engine**: Separate executor service processes agent workflows
- **Authentication**: JWT-based with Supabase integration
- **Security**: Cache protection middleware prevents sensitive data caching in browsers/proxies
## Code Style
- **Top-level imports only** — no local/inner imports (lazy imports only for heavy optional deps like `openpyxl`)
- **Absolute imports** — use `from backend.module import ...` for cross-package imports. Single-dot relative (`from .sibling import ...`) is acceptable for sibling modules within the same package (e.g., blocks). Avoid double-dot relative imports (`from ..parent import ...`) — use the absolute path instead
- **No duck typing** — no `hasattr`/`getattr`/`isinstance` for type dispatch; use typed interfaces/unions/protocols
- **Pydantic models** over dataclass/namedtuple/dict for structured data
- **No linter suppressors** — no `# type: ignore`, `# noqa`, `# pyright: ignore`; fix the type/code
- **List comprehensions** over manual loop-and-append
- **Early return** — guard clauses first, avoid deep nesting
- **f-strings vs printf syntax in log statements** — Use `%s` for deferred interpolation in `debug` statements, f-strings elsewhere for readability: `logger.debug("Processing %s items", count)`, `logger.info(f"Processing {count} items")`
- **Sanitize error paths** — `os.path.basename()` in error messages to avoid leaking directory structure
- **TOCTOU awareness** — avoid check-then-act patterns for file access and credit charging
- **`Security()` vs `Depends()`** — use `Security()` for auth deps to get proper OpenAPI security spec
- **Redis pipelines** — `transaction=True` for atomicity on multi-step operations
- **`max(0, value)` guards** — for computed values that should never be negative
- **SSE protocol** — `data:` lines for frontend-parsed events (must match Zod schema), `: comment` lines for heartbeats/status
- **File length** — keep files under ~300 lines; if a file grows beyond this, split by responsibility (e.g. extract helpers, models, or a sub-module into a new file). Never keep appending to a long file.
- **Function length** — keep functions under ~40 lines; extract named helpers when a function grows longer. Long functions are a sign of mixed concerns, not complexity.
- **Top-down ordering** — define the main/public function or class first, then the helpers it uses below. A reader should encounter high-level logic before implementation details.
## Testing Approach
- Uses pytest with snapshot testing for API responses
- Test files are colocated with source files (`*_test.py`)
- Mock at boundaries — mock where the symbol is **used**, not where it's **defined**
- After refactoring, update mock targets to match new module paths
- Use `AsyncMock` for async functions (`from unittest.mock import AsyncMock`)
### Test-Driven Development (TDD)
When fixing a bug or adding a feature, write the test **before** the implementation:
```python
# 1. Write a failing test marked xfail
@pytest.mark.xfail(reason="Bug #1234: widget crashes on empty input")
def test_widget_handles_empty_input():
result = widget.process("")
assert result == Widget.EMPTY_RESULT
# 2. Run it — confirm it fails (XFAIL)
# poetry run pytest path/to/test.py::test_widget_handles_empty_input -xvs
# 3. Implement the fix
# 4. Remove xfail, run again — confirm it passes
def test_widget_handles_empty_input():
result = widget.process("")
assert result == Widget.EMPTY_RESULT
```
This catches regressions and proves the fix actually works. **Every bug fix should include a test that would have caught it.**
## Database Schema
Key models (defined in `schema.prisma`):
- `User`: Authentication and profile data
- `AgentGraph`: Workflow definitions with version control
- `AgentGraphExecution`: Execution history and results
- `AgentNode`: Individual nodes in a workflow
- `StoreListing`: Marketplace listings for sharing agents
## Environment Configuration
- **Backend**: `.env.default` (defaults) → `.env` (user overrides)
## Common Development Tasks
### Adding a new block
Follow the comprehensive [Block SDK Guide](@../../docs/platform/block-sdk-guide.md) which covers:
- Provider configuration with `ProviderBuilder`
- Block schema definition
- Authentication (API keys, OAuth, webhooks)
- Testing and validation
- File organization
Quick steps:
1. Create new file in `backend/blocks/`
2. Configure provider using `ProviderBuilder` in `_config.py`
3. Inherit from `Block` base class
4. Define input/output schemas using `BlockSchema`
5. Implement async `run` method
6. Generate unique block ID using `uuid.uuid4()`
7. Test with `poetry run pytest backend/blocks/test/test_block.py`
Note: when making many new blocks analyze the interfaces for each of these blocks and picture if they would go well together in a graph-based editor or would they struggle to connect productively?
ex: do the inputs and outputs tie well together?
If you get any pushback or hit complex block conditions check the new_blocks guide in the docs.
#### Handling files in blocks with `store_media_file()`
When blocks need to work with files (images, videos, documents), use `store_media_file()` from `backend.util.file`. The `return_format` parameter determines what you get back:
| Format | Use When | Returns |
|--------|----------|---------|
| `"for_local_processing"` | Processing with local tools (ffmpeg, MoviePy, PIL) | Local file path (e.g., `"image.png"`) |
| `"for_external_api"` | Sending content to external APIs (Replicate, OpenAI) | Data URI (e.g., `"data:image/png;base64,..."`) |
| `"for_block_output"` | Returning output from your block | Smart: `workspace://` in CoPilot, data URI in graphs |
**Examples:**
```python
# INPUT: Need to process file locally with ffmpeg
local_path = await store_media_file(
file=input_data.video,
execution_context=execution_context,
return_format="for_local_processing",
)
# local_path = "video.mp4" - use with Path/ffmpeg/etc
# INPUT: Need to send to external API like Replicate
image_b64 = await store_media_file(
file=input_data.image,
execution_context=execution_context,
return_format="for_external_api",
)
# image_b64 = "data:image/png;base64,iVBORw0..." - send to API
# OUTPUT: Returning result from block
result_url = await store_media_file(
file=generated_image_url,
execution_context=execution_context,
return_format="for_block_output",
)
yield "image_url", result_url
# In CoPilot: result_url = "workspace://abc123"
# In graphs: result_url = "data:image/png;base64,..."
```
**Key points:**
- `for_block_output` is the ONLY format that auto-adapts to execution context
- Always use `for_block_output` for block outputs unless you have a specific reason not to
- Never hardcode workspace checks - let `for_block_output` handle it
### Modifying the API
1. Update route in `backend/api/features/`
2. Add/update Pydantic models in same directory
3. Write tests alongside the route file
4. Run `poetry run test` to verify
## Workspace & Media Files
**Read [Workspace & Media Architecture](../../docs/platform/workspace-media-architecture.md) when:**
- Working on CoPilot file upload/download features
- Building blocks that handle `MediaFileType` inputs/outputs
- Modifying `WorkspaceManager` or `store_media_file()`
- Debugging file persistence or virus scanning issues
Covers: `WorkspaceManager` (persistent storage with session scoping), `store_media_file()` (media normalization pipeline), and responsibility boundaries for virus scanning and persistence.
## Security Implementation
### Cache Protection Middleware
- Located in `backend/api/middleware/security.py`
- Default behavior: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
- Uses an allow list approach - only explicitly permitted paths can be cached
- Cacheable paths include: static assets (`static/*`, `_next/static/*`), health checks, public store pages, documentation
- Prevents sensitive data (auth tokens, API keys, user data) from being cached by browsers/proxies
- To allow caching for a new endpoint, add it to `CACHEABLE_PATHS` in the middleware
- Applied to both main API server and external API applications

View File

@@ -1,227 +1 @@
# CLAUDE.md - Backend
This file provides guidance to Claude Code when working with the backend.
## Essential Commands
To run something with Python package dependencies you MUST use `poetry run ...`.
```bash
# Install dependencies
poetry install
# Run database migrations
poetry run prisma migrate dev
# Start all services (database, redis, rabbitmq, clamav)
docker compose up -d
# Run the backend as a whole
poetry run app
# Run tests
poetry run test
# Run specific test
poetry run pytest path/to/test_file.py::test_function_name
# Run block tests (tests that validate all blocks work correctly)
poetry run pytest backend/blocks/test/test_block.py -xvs
# Run tests for a specific block (e.g., GetCurrentTimeBlock)
poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[GetCurrentTimeBlock]' -xvs
# Lint and format
# prefer format if you want to just "fix" it and only get the errors that can't be autofixed
poetry run format # Black + isort
poetry run lint # ruff
```
More details can be found in @TESTING.md
### Creating/Updating Snapshots
When you first write a test or when the expected output changes:
```bash
poetry run pytest path/to/test.py --snapshot-update
```
⚠️ **Important**: Always review snapshot changes before committing! Use `git diff` to verify the changes are expected.
## Architecture
- **API Layer**: FastAPI with REST and WebSocket endpoints
- **Database**: PostgreSQL with Prisma ORM, includes pgvector for embeddings
- **Queue System**: RabbitMQ for async task processing
- **Execution Engine**: Separate executor service processes agent workflows
- **Authentication**: JWT-based with Supabase integration
- **Security**: Cache protection middleware prevents sensitive data caching in browsers/proxies
## Code Style
- **Top-level imports only** — no local/inner imports (lazy imports only for heavy optional deps like `openpyxl`)
- **Absolute imports** — use `from backend.module import ...` for cross-package imports. Single-dot relative (`from .sibling import ...`) is acceptable for sibling modules within the same package (e.g., blocks). Avoid double-dot relative imports (`from ..parent import ...`) — use the absolute path instead
- **No duck typing** — no `hasattr`/`getattr`/`isinstance` for type dispatch; use typed interfaces/unions/protocols
- **Pydantic models** over dataclass/namedtuple/dict for structured data
- **No linter suppressors** — no `# type: ignore`, `# noqa`, `# pyright: ignore`; fix the type/code
- **List comprehensions** over manual loop-and-append
- **Early return** — guard clauses first, avoid deep nesting
- **f-strings vs printf syntax in log statements** — Use `%s` for deferred interpolation in `debug` statements, f-strings elsewhere for readability: `logger.debug("Processing %s items", count)`, `logger.info(f"Processing {count} items")`
- **Sanitize error paths** — `os.path.basename()` in error messages to avoid leaking directory structure
- **TOCTOU awareness** — avoid check-then-act patterns for file access and credit charging
- **`Security()` vs `Depends()`** — use `Security()` for auth deps to get proper OpenAPI security spec
- **Redis pipelines** — `transaction=True` for atomicity on multi-step operations
- **`max(0, value)` guards** — for computed values that should never be negative
- **SSE protocol** — `data:` lines for frontend-parsed events (must match Zod schema), `: comment` lines for heartbeats/status
- **File length** — keep files under ~300 lines; if a file grows beyond this, split by responsibility (e.g. extract helpers, models, or a sub-module into a new file). Never keep appending to a long file.
- **Function length** — keep functions under ~40 lines; extract named helpers when a function grows longer. Long functions are a sign of mixed concerns, not complexity.
- **Top-down ordering** — define the main/public function or class first, then the helpers it uses below. A reader should encounter high-level logic before implementation details.
## Testing Approach
- Uses pytest with snapshot testing for API responses
- Test files are colocated with source files (`*_test.py`)
- Mock at boundaries — mock where the symbol is **used**, not where it's **defined**
- After refactoring, update mock targets to match new module paths
- Use `AsyncMock` for async functions (`from unittest.mock import AsyncMock`)
### Test-Driven Development (TDD)
When fixing a bug or adding a feature, write the test **before** the implementation:
```python
# 1. Write a failing test marked xfail
@pytest.mark.xfail(reason="Bug #1234: widget crashes on empty input")
def test_widget_handles_empty_input():
result = widget.process("")
assert result == Widget.EMPTY_RESULT
# 2. Run it — confirm it fails (XFAIL)
# poetry run pytest path/to/test.py::test_widget_handles_empty_input -xvs
# 3. Implement the fix
# 4. Remove xfail, run again — confirm it passes
def test_widget_handles_empty_input():
result = widget.process("")
assert result == Widget.EMPTY_RESULT
```
This catches regressions and proves the fix actually works. **Every bug fix should include a test that would have caught it.**
## Database Schema
Key models (defined in `schema.prisma`):
- `User`: Authentication and profile data
- `AgentGraph`: Workflow definitions with version control
- `AgentGraphExecution`: Execution history and results
- `AgentNode`: Individual nodes in a workflow
- `StoreListing`: Marketplace listings for sharing agents
## Environment Configuration
- **Backend**: `.env.default` (defaults) → `.env` (user overrides)
## Common Development Tasks
### Adding a new block
Follow the comprehensive [Block SDK Guide](@../../docs/content/platform/block-sdk-guide.md) which covers:
- Provider configuration with `ProviderBuilder`
- Block schema definition
- Authentication (API keys, OAuth, webhooks)
- Testing and validation
- File organization
Quick steps:
1. Create new file in `backend/blocks/`
2. Configure provider using `ProviderBuilder` in `_config.py`
3. Inherit from `Block` base class
4. Define input/output schemas using `BlockSchema`
5. Implement async `run` method
6. Generate unique block ID using `uuid.uuid4()`
7. Test with `poetry run pytest backend/blocks/test/test_block.py`
Note: when making many new blocks analyze the interfaces for each of these blocks and picture if they would go well together in a graph-based editor or would they struggle to connect productively?
ex: do the inputs and outputs tie well together?
If you get any pushback or hit complex block conditions check the new_blocks guide in the docs.
#### Handling files in blocks with `store_media_file()`
When blocks need to work with files (images, videos, documents), use `store_media_file()` from `backend.util.file`. The `return_format` parameter determines what you get back:
| Format | Use When | Returns |
|--------|----------|---------|
| `"for_local_processing"` | Processing with local tools (ffmpeg, MoviePy, PIL) | Local file path (e.g., `"image.png"`) |
| `"for_external_api"` | Sending content to external APIs (Replicate, OpenAI) | Data URI (e.g., `"data:image/png;base64,..."`) |
| `"for_block_output"` | Returning output from your block | Smart: `workspace://` in CoPilot, data URI in graphs |
**Examples:**
```python
# INPUT: Need to process file locally with ffmpeg
local_path = await store_media_file(
file=input_data.video,
execution_context=execution_context,
return_format="for_local_processing",
)
# local_path = "video.mp4" - use with Path/ffmpeg/etc
# INPUT: Need to send to external API like Replicate
image_b64 = await store_media_file(
file=input_data.image,
execution_context=execution_context,
return_format="for_external_api",
)
# image_b64 = "data:image/png;base64,iVBORw0..." - send to API
# OUTPUT: Returning result from block
result_url = await store_media_file(
file=generated_image_url,
execution_context=execution_context,
return_format="for_block_output",
)
yield "image_url", result_url
# In CoPilot: result_url = "workspace://abc123"
# In graphs: result_url = "data:image/png;base64,..."
```
**Key points:**
- `for_block_output` is the ONLY format that auto-adapts to execution context
- Always use `for_block_output` for block outputs unless you have a specific reason not to
- Never hardcode workspace checks - let `for_block_output` handle it
### Modifying the API
1. Update route in `backend/api/features/`
2. Add/update Pydantic models in same directory
3. Write tests alongside the route file
4. Run `poetry run test` to verify
## Workspace & Media Files
**Read [Workspace & Media Architecture](../../docs/platform/workspace-media-architecture.md) when:**
- Working on CoPilot file upload/download features
- Building blocks that handle `MediaFileType` inputs/outputs
- Modifying `WorkspaceManager` or `store_media_file()`
- Debugging file persistence or virus scanning issues
Covers: `WorkspaceManager` (persistent storage with session scoping), `store_media_file()` (media normalization pipeline), and responsibility boundaries for virus scanning and persistence.
## Security Implementation
### Cache Protection Middleware
- Located in `backend/api/middleware/security.py`
- Default behavior: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
- Uses an allow list approach - only explicitly permitted paths can be cached
- Cacheable paths include: static assets (`static/*`, `_next/static/*`), health checks, public store pages, documentation
- Prevents sensitive data (auth tokens, API keys, user data) from being cached by browsers/proxies
- To allow caching for a new endpoint, add it to `CACHEABLE_PATHS` in the middleware
- Applied to both main API server and external API applications
@AGENTS.md

View File

@@ -72,7 +72,7 @@ class RunAgentRequest(BaseModel):
def _create_ephemeral_session(user_id: str) -> ChatSession:
"""Create an ephemeral session for stateless API requests."""
return ChatSession.new(user_id)
return ChatSession.new(user_id, dry_run=False)
@tools_router.post(

View File

@@ -11,7 +11,7 @@ from autogpt_libs import auth
from fastapi import APIRouter, HTTPException, Query, Response, Security
from fastapi.responses import StreamingResponse
from prisma.models import UserWorkspaceFile
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
from backend.copilot import service as chat_service
from backend.copilot import stream_registry
@@ -20,6 +20,7 @@ from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_
from backend.copilot.model import (
ChatMessage,
ChatSession,
ChatSessionMetadata,
append_and_save_message,
create_chat_session,
delete_chat_session,
@@ -112,12 +113,25 @@ class StreamChatRequest(BaseModel):
) # Workspace file IDs attached to this message
class CreateSessionRequest(BaseModel):
"""Request model for creating a new chat session.
``dry_run`` is a **top-level** field — do not nest it inside ``metadata``.
Extra/unknown fields are rejected (422) to prevent silent mis-use.
"""
model_config = ConfigDict(extra="forbid")
dry_run: bool = False
class CreateSessionResponse(BaseModel):
"""Response model containing information on a newly created chat session."""
id: str
created_at: str
user_id: str | None
metadata: ChatSessionMetadata = ChatSessionMetadata()
class ActiveStreamInfo(BaseModel):
@@ -138,6 +152,7 @@ class SessionDetailResponse(BaseModel):
active_stream: ActiveStreamInfo | None = None # Present if stream is still active
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
metadata: ChatSessionMetadata = ChatSessionMetadata()
class SessionSummaryResponse(BaseModel):
@@ -248,6 +263,7 @@ async def list_sessions(
)
async def create_session(
user_id: Annotated[str, Security(auth.get_user_id)],
request: CreateSessionRequest | None = None,
) -> CreateSessionResponse:
"""
Create a new chat session.
@@ -256,22 +272,28 @@ async def create_session(
Args:
user_id: The authenticated user ID parsed from the JWT (required).
request: Optional request body. When provided, ``dry_run=True``
forces run_block and run_agent calls to use dry-run simulation.
Returns:
CreateSessionResponse: Details of the created session.
"""
dry_run = request.dry_run if request else False
logger.info(
f"Creating session with user_id: "
f"...{user_id[-8:] if len(user_id) > 8 else '<redacted>'}"
f"{', dry_run=True' if dry_run else ''}"
)
session = await create_chat_session(user_id)
session = await create_chat_session(user_id, dry_run=dry_run)
return CreateSessionResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
user_id=session.user_id,
metadata=session.metadata,
)
@@ -420,6 +442,7 @@ async def get_session(
active_stream=active_stream_info,
total_prompt_tokens=total_prompt,
total_completion_tokens=total_completion,
metadata=session.metadata,
)
@@ -1174,7 +1197,7 @@ async def health_check() -> dict:
)
# Create and retrieve session to verify full data layer
session = await create_chat_session(health_check_user_id)
session = await create_chat_session(health_check_user_id, dry_run=False)
await get_chat_session(session.session_id, health_check_user_id)
return {

View File

@@ -469,3 +469,60 @@ def test_suggested_prompts_empty_prompts(
assert response.status_code == 200
assert response.json() == {"themes": []}
# ─── Create session: dry_run contract ─────────────────────────────────
def _mock_create_chat_session(mocker: pytest_mock.MockerFixture):
"""Mock create_chat_session to return a fake session."""
from backend.copilot.model import ChatSession
async def _fake_create(user_id: str, *, dry_run: bool):
return ChatSession.new(user_id, dry_run=dry_run)
return mocker.patch(
"backend.api.features.chat.routes.create_chat_session",
new_callable=AsyncMock,
side_effect=_fake_create,
)
def test_create_session_dry_run_true(
mocker: pytest_mock.MockerFixture,
test_user_id: str,
) -> None:
"""Sending ``{"dry_run": true}`` sets metadata.dry_run to True."""
_mock_create_chat_session(mocker)
response = client.post("/sessions", json={"dry_run": True})
assert response.status_code == 200
assert response.json()["metadata"]["dry_run"] is True
def test_create_session_dry_run_default_false(
mocker: pytest_mock.MockerFixture,
test_user_id: str,
) -> None:
"""Empty body defaults dry_run to False."""
_mock_create_chat_session(mocker)
response = client.post("/sessions")
assert response.status_code == 200
assert response.json()["metadata"]["dry_run"] is False
def test_create_session_rejects_nested_metadata(
test_user_id: str,
) -> None:
"""Sending ``{"metadata": {"dry_run": true}}`` must return 422, not silently
default to ``dry_run=False``. This guards against the common mistake of
nesting dry_run inside metadata instead of providing it at the top level."""
response = client.post(
"/sessions",
json={"metadata": {"dry_run": True}},
)
assert response.status_code == 422

View File

@@ -146,6 +146,21 @@ class AutoPilotBlock(Block):
advanced=True,
)
dry_run: bool = SchemaField(
description=(
"When enabled, run_block and run_agent tool calls in this "
"autopilot session are forced to use dry-run simulation mode. "
"No real API calls, side effects, or credits are consumed "
"by those tools. Useful for testing agent wiring and "
"previewing outputs. "
"Only applies when creating a new session (session_id is empty). "
"When reusing an existing session_id, the session's original "
"dry_run setting is preserved."
),
default=False,
advanced=True,
)
# timeout_seconds removed: the SDK manages its own heartbeat-based
# timeouts internally; wrapping with asyncio.timeout corrupts the
# SDK's internal stream (see service.py CRITICAL comment).
@@ -232,11 +247,11 @@ class AutoPilotBlock(Block):
},
)
async def create_session(self, user_id: str) -> str:
async def create_session(self, user_id: str, *, dry_run: bool) -> str:
"""Create a new chat session and return its ID (mockable for tests)."""
from backend.copilot.model import create_chat_session # avoid circular import
session = await create_chat_session(user_id)
session = await create_chat_session(user_id, dry_run=dry_run)
return session.session_id
async def execute_copilot(
@@ -367,7 +382,9 @@ class AutoPilotBlock(Block):
# even if the downstream stream fails (avoids orphaned sessions).
sid = input_data.session_id
if not sid:
sid = await self.create_session(execution_context.user_id)
sid = await self.create_session(
execution_context.user_id, dry_run=input_data.dry_run
)
# NOTE: No asyncio.timeout() here — the SDK manages its own
# heartbeat-based timeouts internally. Wrapping with asyncio.timeout

View File

@@ -18,6 +18,7 @@ import orjson
from langfuse import propagate_attributes
from openai.types.chat import ChatCompletionMessageParam, ChatCompletionToolParam
from backend.copilot.context import set_execution_context
from backend.copilot.model import (
ChatMessage,
ChatSession,
@@ -457,6 +458,9 @@ async def stream_chat_completion_baseline(
tools = get_available_tools()
# Propagate execution context so tool handlers can read session-level flags.
set_execution_context(user_id, session)
yield StreamStart(messageId=message_id, sessionId=session_id)
# Propagate user/session context to Langfuse so all LLM calls within

View File

@@ -31,7 +31,7 @@ async def test_baseline_multi_turn(setup_test_user, test_user_id):
if not api_key:
return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test")
session = await create_chat_session(test_user_id)
session = await create_chat_session(test_user_id, dry_run=False)
session = await upsert_chat_session(session)
# --- Turn 1: send a message with a unique keyword ---

View File

@@ -18,7 +18,13 @@ from prisma.types import (
from backend.data import db
from backend.util.json import SafeJson, sanitize_string
from .model import ChatMessage, ChatSession, ChatSessionInfo, invalidate_session_cache
from .model import (
ChatMessage,
ChatSession,
ChatSessionInfo,
ChatSessionMetadata,
invalidate_session_cache,
)
logger = logging.getLogger(__name__)
@@ -35,6 +41,7 @@ async def get_chat_session(session_id: str) -> ChatSession | None:
async def create_chat_session(
session_id: str,
user_id: str,
metadata: ChatSessionMetadata | None = None,
) -> ChatSessionInfo:
"""Create a new chat session in the database."""
data = ChatSessionCreateInput(
@@ -43,6 +50,7 @@ async def create_chat_session(
credentials=SafeJson({}),
successfulAgentRuns=SafeJson({}),
successfulAgentSchedules=SafeJson({}),
metadata=SafeJson((metadata or ChatSessionMetadata()).model_dump()),
)
prisma_session = await PrismaChatSession.prisma().create(data=data)
return ChatSessionInfo.from_db(prisma_session)
@@ -57,7 +65,12 @@ async def update_chat_session(
total_completion_tokens: int | None = None,
title: str | None = None,
) -> ChatSession | None:
"""Update a chat session's metadata."""
"""Update a chat session's mutable fields.
Note: ``metadata`` (which includes ``dry_run``) is intentionally omitted —
it is set once at creation time and treated as immutable for the lifetime
of the session.
"""
data: ChatSessionUpdateInput = {"updatedAt": datetime.now(UTC)}
if credentials is not None:

View File

@@ -46,6 +46,16 @@ def _get_session_cache_key(session_id: str) -> str:
# ===================== Chat data models ===================== #
class ChatSessionMetadata(BaseModel):
"""Typed metadata stored in the ``metadata`` JSON column of ChatSession.
Add new session-level flags here instead of adding DB columns —
no migration required for new fields as long as a default is provided.
"""
dry_run: bool = False
class ChatMessage(BaseModel):
role: str
content: str | None = None
@@ -90,6 +100,12 @@ class ChatSessionInfo(BaseModel):
updated_at: datetime
successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {}
metadata: ChatSessionMetadata = ChatSessionMetadata()
@property
def dry_run(self) -> bool:
"""Convenience accessor for ``metadata.dry_run``."""
return self.metadata.dry_run
@classmethod
def from_db(cls, prisma_session: PrismaChatSession) -> Self:
@@ -103,6 +119,10 @@ class ChatSessionInfo(BaseModel):
prisma_session.successfulAgentSchedules, default={}
)
# Parse typed metadata from the JSON column.
raw_metadata = _parse_json_field(prisma_session.metadata, default={})
metadata = ChatSessionMetadata.model_validate(raw_metadata)
# Calculate usage from token counts.
# NOTE: Per-turn cache_read_tokens / cache_creation_tokens breakdown
# is lost after persistence — the DB only stores aggregate prompt and
@@ -128,6 +148,7 @@ class ChatSessionInfo(BaseModel):
updated_at=prisma_session.updatedAt,
successful_agent_runs=successful_agent_runs,
successful_agent_schedules=successful_agent_schedules,
metadata=metadata,
)
@@ -135,7 +156,7 @@ class ChatSession(ChatSessionInfo):
messages: list[ChatMessage]
@classmethod
def new(cls, user_id: str) -> Self:
def new(cls, user_id: str, *, dry_run: bool) -> Self:
return cls(
session_id=str(uuid.uuid4()),
user_id=user_id,
@@ -145,6 +166,7 @@ class ChatSession(ChatSessionInfo):
credentials={},
started_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
metadata=ChatSessionMetadata(dry_run=dry_run),
)
@classmethod
@@ -532,6 +554,7 @@ async def _save_session_to_db(
await db.create_chat_session(
session_id=session.session_id,
user_id=session.user_id,
metadata=session.metadata,
)
existing_message_count = 0
@@ -609,21 +632,27 @@ async def append_and_save_message(session_id: str, message: ChatMessage) -> Chat
return session
async def create_chat_session(user_id: str) -> ChatSession:
async def create_chat_session(user_id: str, *, dry_run: bool) -> ChatSession:
"""Create a new chat session and persist it.
Args:
user_id: The authenticated user ID.
dry_run: When True, run_block and run_agent tool calls in this
session are forced to use dry-run simulation mode.
Raises:
DatabaseError: If the database write fails. We fail fast to ensure
callers never receive a non-persisted session that only exists
in cache (which would be lost when the cache expires).
"""
session = ChatSession.new(user_id)
session = ChatSession.new(user_id, dry_run=dry_run)
# Create in database first - fail fast if this fails
try:
await chat_db().create_chat_session(
session_id=session.session_id,
user_id=user_id,
metadata=session.metadata,
)
except Exception as e:
logger.error(f"Failed to create session {session.session_id} in database: {e}")

View File

@@ -46,7 +46,7 @@ messages = [
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_serialization_deserialization():
s = ChatSession.new(user_id="abc123")
s = ChatSession.new(user_id="abc123", dry_run=False)
s.messages = messages
s.usage = [Usage(prompt_tokens=100, completion_tokens=200, total_tokens=300)]
serialized = s.model_dump_json()
@@ -57,7 +57,7 @@ async def test_chatsession_serialization_deserialization():
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_redis_storage(setup_test_user, test_user_id):
s = ChatSession.new(user_id=test_user_id)
s = ChatSession.new(user_id=test_user_id, dry_run=False)
s.messages = messages
s = await upsert_chat_session(s)
@@ -75,7 +75,7 @@ async def test_chatsession_redis_storage_user_id_mismatch(
setup_test_user, test_user_id
):
s = ChatSession.new(user_id=test_user_id)
s = ChatSession.new(user_id=test_user_id, dry_run=False)
s.messages = messages
s = await upsert_chat_session(s)
@@ -90,7 +90,7 @@ async def test_chatsession_db_storage(setup_test_user, test_user_id):
from backend.data.redis_client import get_redis_async
# Create session with messages including assistant message
s = ChatSession.new(user_id=test_user_id)
s = ChatSession.new(user_id=test_user_id, dry_run=False)
s.messages = messages # Contains user, assistant, and tool messages
assert s.session_id is not None, "Session id is not set"
# Upsert to save to both cache and DB
@@ -241,7 +241,7 @@ _raw_tc2 = {
def test_add_tool_call_appends_to_existing_assistant():
"""When the last assistant is from the current turn, tool_call is added to it."""
session = ChatSession.new(user_id="u")
session = ChatSession.new(user_id="u", dry_run=False)
session.messages = [
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="working on it"),
@@ -254,7 +254,7 @@ def test_add_tool_call_appends_to_existing_assistant():
def test_add_tool_call_creates_assistant_when_none_exists():
"""When there's no current-turn assistant, a new one is created."""
session = ChatSession.new(user_id="u")
session = ChatSession.new(user_id="u", dry_run=False)
session.messages = [
ChatMessage(role="user", content="hi"),
]
@@ -267,7 +267,7 @@ def test_add_tool_call_creates_assistant_when_none_exists():
def test_add_tool_call_does_not_cross_user_boundary():
"""A user message acts as a boundary — previous assistant is not modified."""
session = ChatSession.new(user_id="u")
session = ChatSession.new(user_id="u", dry_run=False)
session.messages = [
ChatMessage(role="assistant", content="old turn"),
ChatMessage(role="user", content="new message"),
@@ -282,7 +282,7 @@ def test_add_tool_call_does_not_cross_user_boundary():
def test_add_tool_call_multiple_times():
"""Multiple long-running tool calls accumulate on the same assistant."""
session = ChatSession.new(user_id="u")
session = ChatSession.new(user_id="u", dry_run=False)
session.messages = [
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="doing stuff"),
@@ -300,7 +300,7 @@ def test_add_tool_call_multiple_times():
def test_to_openai_messages_merges_split_assistants():
"""End-to-end: session with split assistants produces valid OpenAI messages."""
session = ChatSession.new(user_id="u")
session = ChatSession.new(user_id="u", dry_run=False)
session.messages = [
ChatMessage(role="user", content="build agent"),
ChatMessage(role="assistant", content="Let me build that"),
@@ -352,7 +352,7 @@ async def test_concurrent_saves_collision_detection(setup_test_user, test_user_i
import asyncio
# Create a session with initial messages
session = ChatSession.new(user_id=test_user_id)
session = ChatSession.new(user_id=test_user_id, dry_run=False)
for i in range(3):
session.messages.append(
ChatMessage(

View File

@@ -107,6 +107,13 @@ Do not re-fetch or re-generate data you already have from prior tool calls.
After building the file, reference it with `@@agptfile:` in other tools:
`@@agptfile:/home/user/report.md`
### Web search best practices
- If 3 similar web searches don't return the specific data you need, conclude
it isn't publicly available and work with what you have.
- Prefer fewer, well-targeted searches over many variations of the same query.
- When spawning sub-agents for research, ensure each has a distinct
non-overlapping scope to avoid redundant searches.
### Sub-agent tasks
- When using the Task tool, NEVER set `run_in_background` to true.
All tasks must run in the foreground.

View File

@@ -0,0 +1,21 @@
"""Tests for agent generation guide — verifies clarification section."""
from pathlib import Path
class TestAgentGenerationGuideContainsClarifySection:
"""The agent generation guide must include the clarification section."""
def test_guide_includes_clarify_before_building(self):
guide_path = Path(__file__).parent / "sdk" / "agent_generation_guide.md"
content = guide_path.read_text(encoding="utf-8")
assert "Clarifying Before Building" in content
def test_guide_mentions_find_block_for_clarification(self):
guide_path = Path(__file__).parent / "sdk" / "agent_generation_guide.md"
content = guide_path.read_text(encoding="utf-8")
# find_block must appear in the clarification section (before the workflow)
clarify_section = content.split("Clarifying Before Building")[1].split(
"### Workflow"
)[0]
assert "find_block" in clarify_section

View File

@@ -3,6 +3,21 @@
You can create, edit, and customize agents directly. You ARE the brain —
generate the agent JSON yourself using block schemas, then validate and save.
### Clarifying Before Building
Before starting the workflow below, check whether the user's goal is
**ambiguous** — missing the output format, delivery channel, data source,
or trigger. If so:
1. Call `find_block` with a query targeting the ambiguous dimension to
discover what the platform actually supports.
2. Ask the user **one concrete question** grounded in the discovered
options (e.g. "The platform supports Gmail, Slack, and Google Docs —
which should the agent use for delivery?").
3. **Wait for the user's answer** before proceeding.
**Skip this** when the goal already specifies all dimensions (e.g.
"scrape prices from Amazon and email me daily").
### Workflow for Creating/Editing Agents
1. **Discover blocks**: Call `find_block(query, include_schemas=true)` to

View File

@@ -25,7 +25,7 @@ from backend.copilot.sdk.compaction import (
def _make_session() -> ChatSession:
return ChatSession.new(user_id="test-user")
return ChatSession.new(user_id="test-user", dry_run=False)
# ---------------------------------------------------------------------------

View File

@@ -275,7 +275,7 @@ class TestCompactionE2E:
# --- Step 7: CompactionTracker receives PreCompact hook ---
tracker = CompactionTracker()
session = ChatSession.new(user_id="test-user")
session = ChatSession.new(user_id="test-user", dry_run=False)
tracker.on_compact(str(session_file))
# --- Step 8: Next SDK message arrives → emit_start ---
@@ -376,7 +376,7 @@ class TestCompactionE2E:
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
tracker = CompactionTracker()
session = ChatSession.new(user_id="test")
session = ChatSession.new(user_id="test", dry_run=False)
builder = TranscriptBuilder()
# --- First query with compaction ---

View File

@@ -38,7 +38,7 @@ class TestFlattenAssistantContent:
def test_tool_use_blocks(self):
blocks = [{"type": "tool_use", "name": "read_file", "input": {}}]
assert _flatten_assistant_content(blocks) == "[tool_use: read_file]"
assert _flatten_assistant_content(blocks) == ""
def test_mixed_blocks(self):
blocks = [
@@ -47,19 +47,22 @@ class TestFlattenAssistantContent:
]
result = _flatten_assistant_content(blocks)
assert "Let me read that." in result
assert "[tool_use: Read]" in result
# tool_use blocks are dropped entirely to prevent model mimicry
assert "Read" not in result
def test_raw_strings(self):
assert _flatten_assistant_content(["hello", "world"]) == "hello\nworld"
def test_unknown_block_type_preserved_as_placeholder(self):
def test_unknown_block_type_dropped(self):
blocks = [
{"type": "text", "text": "See this image:"},
{"type": "image", "source": {"type": "base64", "data": "..."}},
]
result = _flatten_assistant_content(blocks)
assert "See this image:" in result
assert "[__image__]" in result
# Unknown block types are dropped to prevent model mimicry
assert "[__image__]" not in result
assert "base64" not in result
def test_empty(self):
assert _flatten_assistant_content([]) == ""
@@ -279,7 +282,8 @@ class TestTranscriptToMessages:
messages = _transcript_to_messages(content)
assert len(messages) == 2
assert "Let me check." in messages[0]["content"]
assert "[tool_use: read_file]" in messages[0]["content"]
# tool_use blocks are dropped entirely to prevent model mimicry
assert "read_file" not in messages[0]["content"]
assert messages[1]["content"] == "file contents"

View File

@@ -49,22 +49,22 @@ def test_format_assistant_tool_calls():
)
]
result = _format_conversation_context(msgs)
assert result is not None
assert 'You called tool: search({"q": "test"})' in result
# Assistant with no content and tool_calls omitted produces no lines
assert result is None
def test_format_tool_result():
msgs = [ChatMessage(role="tool", content='{"result": "ok"}')]
result = _format_conversation_context(msgs)
assert result is not None
assert 'Tool result: {"result": "ok"}' in result
assert 'Tool output: {"result": "ok"}' in result
def test_format_tool_result_none_content():
msgs = [ChatMessage(role="tool", content=None)]
result = _format_conversation_context(msgs)
assert result is not None
assert "Tool result: " in result
assert "Tool output: " in result
def test_format_full_conversation():
@@ -84,8 +84,8 @@ def test_format_full_conversation():
assert result is not None
assert "User: find agents" in result
assert "You responded: I'll search for agents." in result
assert "You called tool: find_agents" in result
assert "Tool result:" in result
# tool_calls are omitted to prevent model mimicry
assert "Tool output:" in result
assert "You responded: Found Agent1." in result

View File

@@ -27,6 +27,7 @@ from backend.copilot.response_model import (
StreamError,
StreamFinish,
StreamFinishStep,
StreamHeartbeat,
StreamStart,
StreamStartStep,
StreamTextDelta,
@@ -76,6 +77,12 @@ class SDKResponseAdapter:
# Open the first step (matches non-SDK: StreamStart then StreamStartStep)
responses.append(StreamStartStep())
self.step_open = True
elif sdk_message.subtype == "task_progress":
# Emit a heartbeat so publish_chunk is called during long
# sub-agent runs. Without this, the Redis stream and meta
# key TTLs expire during gaps where no real chunks are
# produced (task_progress events were previously silent).
responses.append(StreamHeartbeat())
elif isinstance(sdk_message, AssistantMessage):
# Flush any SDK built-in tool calls that didn't get a UserMessage

View File

@@ -18,6 +18,7 @@ from backend.copilot.response_model import (
StreamError,
StreamFinish,
StreamFinishStep,
StreamHeartbeat,
StreamStart,
StreamStartStep,
StreamTextDelta,
@@ -59,6 +60,14 @@ def test_system_non_init_emits_nothing():
assert results == []
def test_task_progress_emits_heartbeat():
"""task_progress events emit a StreamHeartbeat to keep Redis TTL alive."""
adapter = _adapter()
results = adapter.convert_message(SystemMessage(subtype="task_progress", data={}))
assert len(results) == 1
assert isinstance(results[0], StreamHeartbeat)
# -- AssistantMessage with TextBlock -----------------------------------------

View File

@@ -904,14 +904,14 @@ class TestTranscriptEdgeCases:
assert restored[1]["content"] == "Second"
def test_flatten_assistant_with_only_tool_use(self):
"""Assistant message with only tool_use blocks (no text)."""
"""Assistant message with only tool_use blocks (no text) flattens to empty."""
blocks = [
{"type": "tool_use", "name": "bash", "input": {"cmd": "ls"}},
{"type": "tool_use", "name": "read", "input": {"path": "/f"}},
]
result = _flatten_assistant_content(blocks)
assert "[tool_use: bash]" in result
assert "[tool_use: read]" in result
# tool_use blocks are dropped entirely to prevent model mimicry
assert result == ""
def test_flatten_tool_result_nested_image(self):
"""Tool result containing image blocks uses placeholder."""
@@ -1414,3 +1414,76 @@ class TestStreamChatCompletionRetryIntegration:
# Verify user-friendly message (not raw SDK text)
assert "Authentication" in errors[0].errorText
assert any(isinstance(e, StreamStart) for e in events)
@pytest.mark.asyncio
async def test_result_message_prompt_too_long_triggers_compaction(self):
"""CLI returns ResultMessage(subtype="error") with "Prompt is too long".
When the Claude CLI rejects the prompt pre-API (model=<synthetic>,
duration_api_ms=0), it sends a ResultMessage with is_error=True
instead of raising a Python exception. The retry loop must still
detect this as a context-length error and trigger compaction.
"""
import contextlib
from claude_agent_sdk import ResultMessage
from backend.copilot.response_model import StreamError, StreamStart
from backend.copilot.sdk.service import stream_chat_completion_sdk
session = self._make_session()
success_result = self._make_result_message()
attempt_count = [0]
error_result = ResultMessage(
subtype="error",
result="Prompt is too long",
duration_ms=100,
duration_api_ms=0,
is_error=True,
num_turns=0,
session_id="test-session-id",
)
def _client_factory(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
# First attempt: CLI returns error ResultMessage
return self._make_client_mock(result_message=error_result)
# Second attempt (after compaction): succeeds
return self._make_client_mock(result_message=success_result)
original_transcript = _build_transcript(
[("user", "prior question"), ("assistant", "prior answer")]
)
compacted_transcript = _build_transcript(
[("user", "[summary]"), ("assistant", "summary reply")]
)
patches = _make_sdk_patches(
session,
original_transcript=original_transcript,
compacted_transcript=compacted_transcript,
client_side_effect=_client_factory,
)
events = []
with contextlib.ExitStack() as stack:
for target, kwargs in patches:
stack.enter_context(patch(target, **kwargs))
async for event in stream_chat_completion_sdk(
session_id="test-session-id",
message="hello",
is_user_message=True,
user_id="test-user",
session=session,
):
events.append(event)
assert attempt_count[0] == 2, (
f"Expected 2 SDK attempts (CLI error ResultMessage "
f"should trigger compaction retry), got {attempt_count[0]}"
)
errors = [e for e in events if isinstance(e, StreamError)]
assert not errors, f"Unexpected StreamError: {errors}"
assert any(isinstance(e, StreamStart) for e in events)

View File

@@ -313,8 +313,7 @@ def create_security_hooks(
.replace("\r", "")
)
logger.info(
"[SDK] Context compaction triggered: %s, user=%s, "
"transcript_path=%s",
"[SDK] Context compaction triggered: %s, user=%s, transcript_path=%s",
trigger,
user_id,
transcript_path,

View File

@@ -11,7 +11,11 @@ import pytest
from backend.copilot.context import _current_project_dir
from .security_hooks import _validate_tool_access, _validate_user_isolation
from .security_hooks import (
_validate_tool_access,
_validate_user_isolation,
create_security_hooks,
)
SDK_CWD = "/tmp/copilot-abc123"
@@ -220,8 +224,6 @@ def test_bash_builtin_blocked_message_clarity():
@pytest.fixture()
def _hooks():
"""Create security hooks and return (pre, post, post_failure) handlers."""
from .security_hooks import create_security_hooks
hooks = create_security_hooks(user_id="u1", sdk_cwd=SDK_CWD, max_subtasks=2)
pre = hooks["PreToolUse"][0].hooks[0]
post = hooks["PostToolUse"][0].hooks[0]

View File

@@ -59,11 +59,14 @@ from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamFinishStep,
StreamHeartbeat,
StreamStart,
StreamStartStep,
StreamStatus,
StreamTextDelta,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
@@ -81,11 +84,9 @@ from .env import build_sdk_env # noqa: F401 — re-export for backward compat
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .tool_adapter import (
cancel_pending_tool_tasks,
create_copilot_mcp_server,
get_copilot_tool_names,
get_sdk_disallowed_tools,
pre_launch_tool_call,
reset_stash_event,
reset_tool_failure_counters,
set_execution_context,
@@ -115,9 +116,10 @@ _MAX_STREAM_ATTEMPTS = 3
# Hard circuit breaker: abort the stream if the model sends this many
# consecutive tool calls with empty parameters (a sign of context
# saturation or serialization failure). Empty input ({}) is never
# legitimate — even one is suspicious, three is conclusive.
_EMPTY_TOOL_CALL_LIMIT = 3
# saturation or serialization failure). The MCP wrapper now returns
# guidance on the first empty call, giving the model a chance to
# self-correct. The limit is generous to allow recovery attempts.
_EMPTY_TOOL_CALL_LIMIT = 5
# User-facing error shown when the empty-tool-call circuit breaker trips.
_CIRCUIT_BREAKER_ERROR_MSG = (
@@ -746,15 +748,11 @@ def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
elif msg.role == "assistant":
if msg.content:
lines.append(f"You responded: {msg.content}")
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
tool_name = func.get("name", "unknown")
tool_args = func.get("arguments", "")
lines.append(f"You called tool: {tool_name}({tool_args})")
# Omit tool_calls — any text representation gets mimicked
# by the model. Tool results below provide the context.
elif msg.role == "tool":
content = msg.content or ""
lines.append(f"Tool result: {content}")
lines.append(f"Tool output: {content[:500]}")
if not lines:
return None
@@ -1214,6 +1212,14 @@ async def _run_stream_attempt(
consecutive_empty_tool_calls = 0
# --- Intermediate persistence tracking ---
# Flush session messages to DB periodically so page reloads show progress
# during long-running turns (see incident d2f7cba3: 82-min turn lost on refresh).
_last_flush_time = time.monotonic()
_msgs_since_flush = 0
_FLUSH_INTERVAL_SECONDS = 30.0
_FLUSH_MESSAGE_THRESHOLD = 10
# Use manual __aenter__/__aexit__ instead of ``async with`` so we can
# suppress SDK cleanup errors that occur when the SSE client disconnects
# mid-stream. GeneratorExit causes the SDK's ``__aexit__`` to run in a
@@ -1300,6 +1306,21 @@ async def _run_stream_attempt(
error_preview,
)
# Intercept prompt-too-long errors surfaced as
# AssistantMessage.error (not as a Python exception).
# Re-raise so the outer retry loop can compact the
# transcript and retry with reduced context.
# Only check error_text (the error field), not the
# content preview — content may contain arbitrary text
# that false-positives the pattern match.
if _is_prompt_too_long(Exception(error_text)):
logger.warning(
"%s Prompt-too-long detected via AssistantMessage "
"error — raising for retry",
ctx.log_prefix,
)
raise RuntimeError("Prompt is too long")
# Intercept transient API errors (socket closed,
# ECONNRESET) — replace the raw message with a
# user-friendly error text and use the retryable
@@ -1327,28 +1348,17 @@ async def _run_stream_attempt(
ended_with_stream_error = True
break
# Parallel tool execution: pre-launch every ToolUseBlock as an
# asyncio.Task the moment its AssistantMessage arrives. The SDK
# sends one AssistantMessage per tool call when issuing parallel
# calls, so each message is pre-launched independently. The MCP
# handlers will await the already-running task instead of executing
# fresh, making all concurrent tool calls run in parallel.
#
# Also determine if the message is a tool-only batch (all content
# Determine if the message is a tool-only batch (all content
# items are ToolUseBlocks) — such messages have no text output yet,
# so we skip the wait_for_stash flush below.
#
# Note: parallel execution of tools is handled natively by the
# SDK CLI via readOnlyHint annotations on tool definitions.
is_tool_only = False
if isinstance(sdk_msg, AssistantMessage) and sdk_msg.content:
is_tool_only = True
# NOTE: Pre-launches are sequential (each await completes
# file-ref expansion before the next starts). This is fine
# since expansion is typically sub-ms; a future optimisation
# could gather all pre-launches concurrently.
for tool_use in sdk_msg.content:
if isinstance(tool_use, ToolUseBlock):
await pre_launch_tool_call(tool_use.name, tool_use.input)
else:
is_tool_only = False
is_tool_only = all(
isinstance(item, ToolUseBlock) for item in sdk_msg.content
)
# Race-condition fix: SDK hooks (PostToolUse) are
# executed asynchronously via start_soon() — the next
@@ -1404,6 +1414,13 @@ async def _run_stream_attempt(
ctx.log_prefix,
sdk_msg.result or "(no error message provided)",
)
# If the CLI itself rejected the prompt as too long
# (pre-API check, duration_api_ms=0), re-raise as an
# exception so the retry loop can trigger compaction.
# Without this, the ResultMessage is silently consumed
# and the retry/compaction mechanism is never invoked.
if _is_prompt_too_long(RuntimeError(sdk_msg.result or "")):
raise RuntimeError("Prompt is too long")
# Capture token usage from ResultMessage.
# Anthropic reports cached tokens separately:
@@ -1482,6 +1499,34 @@ async def _run_stream_attempt(
model=sdk_msg.model,
)
# --- Intermediate persistence ---
# Flush session messages to DB periodically so page reloads
# show progress during long-running turns.
_msgs_since_flush += 1
now = time.monotonic()
if (
_msgs_since_flush >= _FLUSH_MESSAGE_THRESHOLD
or (now - _last_flush_time) >= _FLUSH_INTERVAL_SECONDS
):
try:
await asyncio.shield(upsert_chat_session(ctx.session))
logger.debug(
"%s Intermediate flush: %d messages "
"(msgs_since=%d, elapsed=%.1fs)",
ctx.log_prefix,
len(ctx.session.messages),
_msgs_since_flush,
now - _last_flush_time,
)
except Exception as flush_err:
logger.warning(
"%s Intermediate flush failed: %s",
ctx.log_prefix,
flush_err,
)
_last_flush_time = now
_msgs_since_flush = 0
if acc.stream_completed:
break
finally:
@@ -2008,13 +2053,22 @@ async def stream_chat_completion_sdk(
try:
async for event in _run_stream_attempt(stream_ctx, state):
if not isinstance(event, StreamHeartbeat):
if not isinstance(
event,
(
StreamHeartbeat,
# Compaction UI events are cosmetic and must not
# block retry — they're emitted before the SDK
# query on compacted attempts.
StreamStartStep,
StreamFinishStep,
StreamToolInputStart,
StreamToolInputAvailable,
StreamToolOutputAvailable,
),
):
events_yielded += 1
yield event
# Cancel any pre-launched tasks that were never dispatched
# by the SDK (e.g. edge-case SDK behaviour changes). Symmetric
# with the three error-path await cancel_pending_tool_tasks() calls.
await cancel_pending_tool_tasks()
break # Stream completed — exit retry loop
except asyncio.CancelledError:
logger.warning(
@@ -2023,9 +2077,6 @@ async def stream_chat_completion_sdk(
attempt + 1,
_MAX_STREAM_ATTEMPTS,
)
# Cancel any pre-launched tasks so they don't continue executing
# against a rolled-back or abandoned session.
await cancel_pending_tool_tasks()
raise
except _HandledStreamError as exc:
# _run_stream_attempt already yielded a StreamError and
@@ -2057,8 +2108,6 @@ async def stream_chat_completion_sdk(
retryable=True,
)
ended_with_stream_error = True
# Cancel any pre-launched tasks from the failed attempt.
await cancel_pending_tool_tasks()
break
except Exception as e:
stream_err = e
@@ -2075,9 +2124,6 @@ async def stream_chat_completion_sdk(
exc_info=True,
)
session.messages = session.messages[:pre_attempt_msg_count]
# Cancel any pre-launched tasks from the failed attempt so they
# don't continue executing against the rolled-back session.
await cancel_pending_tool_tasks()
if events_yielded > 0:
# Events were already sent to the frontend and cannot be
# unsent. Retrying would produce duplicate/inconsistent

View File

@@ -392,7 +392,7 @@ class TestFlattenThinkingBlocks:
assert result == ""
def test_mixed_thinking_text_tool(self):
"""Mixed blocks: only text and tool_use survive flattening."""
"""Mixed blocks: only text survives flattening; thinking and tool_use dropped."""
blocks = [
{"type": "thinking", "thinking": "hmm", "signature": "sig"},
{"type": "redacted_thinking", "data": "xyz"},
@@ -403,7 +403,8 @@ class TestFlattenThinkingBlocks:
assert "hmm" not in result
assert "xyz" not in result
assert "I'll read the file." in result
assert "[tool_use: Read]" in result
# tool_use blocks are dropped entirely to prevent model mimicry
assert "Read" not in result
# ---------------------------------------------------------------------------

View File

@@ -14,6 +14,7 @@ from contextvars import ContextVar
from typing import TYPE_CHECKING, Any
from claude_agent_sdk import create_sdk_mcp_server, tool
from mcp.types import ToolAnnotations
from backend.copilot.context import (
_current_permissions,
@@ -53,14 +54,6 @@ _MCP_MAX_CHARS = 500_000
MCP_SERVER_NAME = "copilot"
MCP_TOOL_PREFIX = f"mcp__{MCP_SERVER_NAME}__"
# Map from tool_name -> Queue of pre-launched (task, args) pairs.
# Initialised per-session in set_execution_context() so concurrent sessions
# never share the same dict.
_TaskQueueItem = tuple[asyncio.Task[dict[str, Any]], dict[str, Any]]
_tool_task_queues: ContextVar[dict[str, asyncio.Queue[_TaskQueueItem]] | None] = (
ContextVar("_tool_task_queues", default=None)
)
# Stash for MCP tool outputs before the SDK potentially truncates them.
# Keyed by tool_name → full output string. Consumed (popped) by the
# response adapter when it builds StreamToolOutputAvailable.
@@ -115,7 +108,6 @@ def set_execution_context(
_current_permissions.set(permissions)
_pending_tool_outputs.set({})
_stash_event.set(asyncio.Event())
_tool_task_queues.set({})
_consecutive_tool_failures.set({})
@@ -132,48 +124,6 @@ def reset_stash_event() -> None:
event.clear()
async def cancel_pending_tool_tasks() -> None:
"""Cancel all queued pre-launched tasks for the current execution context.
Call this when a stream attempt aborts (error, cancellation) to prevent
pre-launched tasks from continuing to execute against a rolled-back session.
Tasks that are already done are skipped; in-flight tasks are cancelled and
awaited so that any cleanup (``finally`` blocks, DB rollbacks) completes
before the next retry starts.
"""
queues = _tool_task_queues.get()
if not queues:
return
cancelled_tasks: list[asyncio.Task] = []
for tool_name, queue in list(queues.items()):
cancelled = 0
while not queue.empty():
task, _args = queue.get_nowait()
if not task.done():
task.cancel()
cancelled_tasks.append(task)
cancelled += 1
if cancelled:
logger.debug(
"Cancelled %d pre-launched task(s) for tool '%s'", cancelled, tool_name
)
queues.clear()
# Await all cancelled tasks so their cleanup (finally blocks, DB rollbacks)
# completes before the next retry attempt starts new pre-launches.
# Use a timeout to prevent hanging indefinitely if a task's cleanup is stuck.
if cancelled_tasks:
try:
await asyncio.wait_for(
asyncio.gather(*cancelled_tasks, return_exceptions=True),
timeout=5.0,
)
except TimeoutError:
logger.warning(
"Timed out waiting for %d cancelled task(s) to clean up",
len(cancelled_tasks),
)
def reset_tool_failure_counters() -> None:
"""Reset all tool-level circuit breaker counters.
@@ -249,10 +199,6 @@ async def wait_for_stash(timeout: float = 2.0) -> bool:
Uses ``asyncio.Event.wait()`` so it returns the instant the hook signals —
the timeout is purely a safety net for the case where the hook never fires.
Returns ``True`` if the stash signal was received, ``False`` on timeout.
The 2.0 s default was chosen to accommodate slower tool startup in cloud
sandboxes while still failing fast when the hook genuinely will not fire.
With the parallel pre-launch path, hooks typically fire well under 1 ms.
"""
event = _stash_event.get(None)
if event is None:
@@ -271,95 +217,13 @@ async def wait_for_stash(timeout: float = 2.0) -> bool:
return False
async def pre_launch_tool_call(tool_name: str, args: dict[str, Any]) -> None:
"""Pre-launch a tool as a background task so parallel calls run concurrently.
Called when an AssistantMessage with ToolUseBlocks is received, before the
SDK dispatches the MCP tool/call requests. The tool_handler will await the
pre-launched task instead of executing fresh.
The tool_name may include an MCP prefix (e.g. ``mcp__copilot__run_block``);
the prefix is stripped automatically before looking up the tool.
Ordering guarantee: the Claude Agent SDK dispatches MCP ``tools/call`` requests
in the same order as the ToolUseBlocks appear in the AssistantMessage.
Pre-launched tasks are queued FIFO per tool name, so the N-th handler for a
given tool name dequeues the N-th pre-launched task — result and args always
correspond when the SDK preserves order (which it does in the current SDK).
"""
queues = _tool_task_queues.get()
if queues is None:
return
# Strip the MCP server prefix (e.g. "mcp__copilot__") to get the bare tool name.
# Use removeprefix so tool names that themselves contain "__" are handled correctly.
bare_name = tool_name.removeprefix(MCP_TOOL_PREFIX)
base_tool = TOOL_REGISTRY.get(bare_name)
if base_tool is None:
return
user_id, session = get_execution_context()
if session is None:
return
# Expand @@agptfile: references before launching the task.
# The _truncating wrapper (which normally handles expansion) runs AFTER
# pre_launch_tool_call — the pre-launched task would otherwise receive raw
# @@agptfile: tokens and fail to resolve them inside _execute_tool_sync.
# Use _build_input_schema (same path as _truncating) for schema-aware expansion.
input_schema: dict[str, Any] | None
try:
input_schema = _build_input_schema(base_tool)
except Exception:
input_schema = None # schema unavailable — skip schema-aware expansion
try:
args = await expand_file_refs_in_args(
args, user_id, session, input_schema=input_schema
)
except FileRefExpansionError as exc:
logger.warning(
"pre_launch_tool_call: @@agptfile expansion failed for %s: %s — skipping pre-launch",
bare_name,
exc,
)
return
task = asyncio.create_task(_execute_tool_sync(base_tool, user_id, session, args))
# Log unhandled exceptions so "Task exception was never retrieved" warnings
# do not pollute stderr when a task is pre-launched but never dequeued.
task.add_done_callback(
lambda t, name=bare_name: (
logger.warning(
"Pre-launched task for %s raised unhandled: %s",
name,
t.exception(),
)
if not t.cancelled() and t.exception()
else None
)
)
if bare_name not in queues:
queues[bare_name] = asyncio.Queue[_TaskQueueItem]()
# Store (task, args) so the handler can log a warning if the SDK dispatches
# calls in a different order than the ToolUseBlocks appeared in the message.
queues[bare_name].put_nowait((task, args))
async def _execute_tool_sync(
base_tool: BaseTool,
user_id: str | None,
session: ChatSession,
args: dict[str, Any],
) -> dict[str, Any]:
"""Execute a tool synchronously and return MCP-formatted response.
Note: ``@@agptfile:`` expansion should be performed by the caller before
invoking this function. For the normal (non-parallel) path it is handled
by the ``_truncating`` wrapper; for the pre-launched parallel path it is
handled in :func:`pre_launch_tool_call` before the task is created.
"""
"""Execute a tool synchronously and return MCP-formatted response."""
effective_id = f"sdk-{uuid.uuid4().hex[:12]}"
result = await base_tool.execute(
user_id=user_id,
@@ -455,83 +319,7 @@ def create_tool_handler(base_tool: BaseTool):
"""
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Execute the wrapped tool and return MCP-formatted response.
If a pre-launched task exists (from parallel tool pre-launch in the
message loop), await it instead of executing fresh.
"""
queues = _tool_task_queues.get()
if queues and base_tool.name in queues:
queue = queues[base_tool.name]
if not queue.empty():
task, launch_args = queue.get_nowait()
# Sanity-check: warn if the args don't match — this can happen
# if the SDK dispatches tool calls in a different order than the
# ToolUseBlocks appeared in the AssistantMessage (unlikely but
# could occur in future SDK versions or with SDK bugs).
# We compare full values (not just keys) so that two run_block
# calls with different block_id values are caught even though
# both have the same key set.
if launch_args != args:
logger.warning(
"Pre-launched task for %s: arg mismatch "
"(launch_keys=%s, call_keys=%s) — cancelling "
"pre-launched task and falling back to direct execution",
base_tool.name,
(
sorted(launch_args.keys())
if isinstance(launch_args, dict)
else type(launch_args).__name__
),
(
sorted(args.keys())
if isinstance(args, dict)
else type(args).__name__
),
)
if not task.done():
task.cancel()
# Await cancellation to prevent duplicate concurrent
# execution for blocks with side effects.
try:
await task
except (asyncio.CancelledError, Exception):
pass
# Fall through to the direct-execution path below.
else:
# Args match — await the pre-launched task.
try:
result = await task
except asyncio.CancelledError:
# Re-raise: CancelledError may be propagating from the
# outer streaming loop being cancelled — swallowing it
# would mask the cancellation and prevent proper cleanup.
logger.warning(
"Pre-launched tool %s was cancelled — re-raising",
base_tool.name,
)
raise
except Exception as e:
logger.error(
"Pre-launched tool %s failed: %s",
base_tool.name,
e,
exc_info=True,
)
return _mcp_error(
f"Failed to execute {base_tool.name}. "
"Check server logs for details."
)
# Pre-truncate the result so the _truncating wrapper (which
# wraps this handler) receives an already-within-budget
# value. _truncating handles stashing — we must NOT stash
# here or the output will be appended twice to the FIFO
# queue and pop_pending_tool_output would return a duplicate
# entry on the second call for the same tool.
return truncate(result, _MCP_MAX_CHARS)
# No pre-launched task — execute directly (fallback for non-parallel calls).
"""Execute the wrapped tool and return MCP-formatted response."""
user_id, session = get_execution_context()
if session is None:
@@ -648,9 +436,19 @@ def _text_from_mcp_result(result: dict[str, Any]) -> str:
)
_PARALLEL_ANNOTATION = ToolAnnotations(readOnlyHint=True)
def create_copilot_mcp_server(*, use_e2b: bool = False):
"""Create an in-process MCP server configuration for CoPilot tools.
All tools are annotated with ``readOnlyHint=True`` so the SDK CLI
dispatches concurrent tool calls in parallel rather than sequentially.
This is a deliberate override: even side-effect tools use the hint
because the MCP tools are already individually sandboxed and the
pre-launch duplicate-execution bug (SECRT-2204) is worse than
sequential dispatch.
When *use_e2b* is True, five additional MCP file tools are registered
that route directly to the E2B sandbox filesystem, and the caller should
disable the corresponding SDK built-in tools via
@@ -668,6 +466,28 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
Applied once to every registered tool."""
async def wrapper(args: dict[str, Any]) -> dict[str, Any]:
# Empty tool args = model's output was truncated by the API's
# max_tokens limit. Instead of letting the tool fail with a
# confusing error (and eventually tripping the circuit breaker),
# return clear guidance so the model can self-correct.
if not args and input_schema and input_schema.get("required"):
logger.warning(
"[MCP] %s called with empty args (likely output "
"token truncation) — returning guidance",
tool_name,
)
return _mcp_error(
f"Your call to {tool_name} had empty arguments — "
f"this means your previous response was too long and "
f"the tool call input was truncated by the API. "
f"To fix this: break your work into smaller steps. "
f"For large content, first write it to a file using "
f"bash_exec with cat >> (append section by section), "
f"then pass it via @@agptfile:filename reference. "
f"Do NOT retry with the same approach — it will "
f"be truncated again."
)
# Circuit breaker: stop infinite retry loops with identical args.
# Use the original (pre-expansion) args for fingerprinting so
# check and record always use the same key — @@agptfile:
@@ -718,24 +538,35 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
for tool_name, base_tool in TOOL_REGISTRY.items():
handler = create_tool_handler(base_tool)
schema = _build_input_schema(base_tool)
# All tools annotated readOnlyHint=True to enable parallel dispatch.
# The SDK CLI uses this hint to dispatch concurrent tool calls in
# parallel rather than sequentially. Side-effect safety is ensured
# by the tool implementations themselves (idempotency, credentials).
decorated = tool(
tool_name,
base_tool.description,
schema,
annotations=_PARALLEL_ANNOTATION,
)(_truncating(handler, tool_name, input_schema=schema))
sdk_tools.append(decorated)
# E2B file tools replace SDK built-in Read/Write/Edit/Glob/Grep.
if use_e2b:
for name, desc, schema, handler in E2B_FILE_TOOLS:
decorated = tool(name, desc, schema)(_truncating(handler, name))
decorated = tool(
name,
desc,
schema,
annotations=_PARALLEL_ANNOTATION,
)(_truncating(handler, name))
sdk_tools.append(decorated)
# Read tool for SDK-truncated tool results (always needed).
# Read tool for SDK-truncated tool results (always needed, read-only).
read_tool = tool(
_READ_TOOL_NAME,
_READ_TOOL_DESCRIPTION,
_READ_TOOL_SCHEMA,
annotations=_PARALLEL_ANNOTATION,
)(_truncating(_read_file_handler, _READ_TOOL_NAME))
sdk_tools.append(read_tool)

View File

@@ -1,22 +1,21 @@
"""Tests for tool_adapter helpers: truncation, stash, context vars, parallel pre-launch."""
"""Tests for tool_adapter: truncation, stash, context vars, readOnlyHint annotations."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock
import pytest
from mcp.types import ToolAnnotations
from backend.copilot.context import get_sdk_cwd
from backend.copilot.response_model import StreamToolOutputAvailable
from backend.copilot.sdk.file_ref import FileRefExpansionError
from backend.util.truncate import truncate
from .tool_adapter import (
_MCP_MAX_CHARS,
SDK_DISALLOWED_TOOLS,
_text_from_mcp_result,
cancel_pending_tool_tasks,
create_tool_handler,
pop_pending_tool_output,
pre_launch_tool_call,
reset_stash_event,
set_execution_context,
stash_pending_tool_output,
@@ -244,7 +243,7 @@ class TestTruncationAndStashIntegration:
# ---------------------------------------------------------------------------
# Parallel pre-launch infrastructure
# create_tool_handler (direct execution, no pre-launch)
# ---------------------------------------------------------------------------
@@ -277,169 +276,18 @@ def _init_ctx(session=None):
)
class TestPreLaunchToolCall:
"""Tests for pre_launch_tool_call and the queue-based parallel dispatch."""
class TestCreateToolHandler:
"""Tests for create_tool_handler — direct tool execution."""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.asyncio
async def test_unknown_tool_is_silently_ignored(self):
"""pre_launch_tool_call does nothing for tools not in TOOL_REGISTRY."""
# Should not raise even if the tool name is completely unknown
await pre_launch_tool_call("nonexistent_tool", {})
@pytest.mark.asyncio
async def test_mcp_prefix_stripped_before_registry_lookup(self):
"""mcp__copilot__run_block is looked up as 'run_block'."""
mock_tool = _make_mock_tool("run_block")
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("mcp__copilot__run_block", {"block_id": "b1"})
# The task was enqueued — mock_tool.execute should be called once
# (may not complete immediately but should start)
await asyncio.sleep(0) # yield to event loop
mock_tool.execute.assert_awaited_once()
@pytest.mark.asyncio
async def test_bare_tool_name_without_prefix(self):
"""Tool names without __ separator are looked up as-is."""
mock_tool = _make_mock_tool("run_block")
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0)
mock_tool.execute.assert_awaited_once()
@pytest.mark.asyncio
async def test_task_enqueued_fifo_for_same_tool(self):
"""Two pre-launched calls for the same tool name are enqueued FIFO."""
results = []
async def slow_execute(*args, **kwargs):
results.append(len(results))
return StreamToolOutputAvailable(
toolCallId="id",
output=str(len(results) - 1),
toolName="t",
success=True,
)
mock_tool = _make_mock_tool("t")
mock_tool.execute = AsyncMock(side_effect=slow_execute)
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"t": mock_tool},
):
await pre_launch_tool_call("t", {"n": 1})
await pre_launch_tool_call("t", {"n": 2})
await asyncio.sleep(0)
assert mock_tool.execute.await_count == 2
@pytest.mark.asyncio
async def test_file_ref_expansion_failure_skips_pre_launch(self):
"""When @@agptfile: expansion fails, pre_launch_tool_call skips the task.
The handler should then fall back to direct execution (which will also
fail with a proper MCP error via _truncating's own expansion).
"""
mock_tool = _make_mock_tool("run_block", output="should-not-execute")
with (
patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
),
patch(
"backend.copilot.sdk.tool_adapter.expand_file_refs_in_args",
AsyncMock(side_effect=FileRefExpansionError("@@agptfile:missing.txt")),
),
):
# Should not raise — expansion failure is handled gracefully
await pre_launch_tool_call("run_block", {"text": "@@agptfile:missing.txt"})
await asyncio.sleep(0)
# No task was pre-launched — execute was not called
mock_tool.execute.assert_not_awaited()
class TestCreateToolHandlerParallel:
"""Tests for create_tool_handler using pre-launched tasks."""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.asyncio
async def test_handler_uses_prelaunched_task(self):
"""Handler pops and awaits the pre-launched task rather than re-executing."""
mock_tool = _make_mock_tool("run_block", output="pre-launched result")
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0) # let task start
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "b1"})
assert result["isError"] is False
text = result["content"][0]["text"]
assert "pre-launched result" in text
# Should only have been called once (the pre-launched task), not twice
mock_tool.execute.assert_awaited_once()
@pytest.mark.asyncio
async def test_handler_does_not_double_stash_for_prelaunched_task(self):
"""Pre-launched task result must NOT be stashed by tool_handler directly.
The _truncating wrapper wraps tool_handler and handles stashing after
tool_handler returns. If tool_handler also stashed, the output would be
appended twice to the FIFO queue and pop_pending_tool_output would return
a duplicate on the second call.
This test calls tool_handler directly (without _truncating) and asserts
that nothing was stashed — confirming stashing is deferred to _truncating.
"""
mock_tool = _make_mock_tool("run_block", output="stash-me")
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0)
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "b1"})
assert result["isError"] is False
assert "stash-me" in result["content"][0]["text"]
# tool_handler must NOT stash — _truncating (which wraps handler) does it.
# Calling pop here (without going through _truncating) should return None.
not_stashed = pop_pending_tool_output("run_block")
assert not_stashed is None, (
"tool_handler must not stash directly — _truncating handles stashing "
"to prevent double-stash in the FIFO queue"
)
@pytest.mark.asyncio
async def test_handler_falls_back_when_queue_empty(self):
"""When no pre-launched task exists, handler executes directly."""
async def test_handler_executes_tool_directly(self):
"""Handler executes the tool and returns MCP-formatted result."""
mock_tool = _make_mock_tool("run_block", output="direct result")
# Don't call pre_launch_tool_call — queue is empty
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "b1"})
@@ -449,104 +297,9 @@ class TestCreateToolHandlerParallel:
mock_tool.execute.assert_awaited_once()
@pytest.mark.asyncio
async def test_handler_cancelled_error_propagates(self):
"""CancelledError from a pre-launched task is re-raised to preserve cancellation semantics."""
async def test_handler_returns_error_on_no_session(self):
"""When session is None, handler returns MCP error."""
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=asyncio.CancelledError())
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0)
handler = create_tool_handler(mock_tool)
with pytest.raises(asyncio.CancelledError):
await handler({"block_id": "b1"})
@pytest.mark.asyncio
async def test_handler_exception_returns_mcp_error(self):
"""Exception from a pre-launched task is caught and returned as MCP error."""
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=RuntimeError("block exploded"))
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0)
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "b1"})
assert result["isError"] is True
assert "Failed to execute run_block" in result["content"][0]["text"]
@pytest.mark.asyncio
async def test_two_same_tool_calls_dispatched_in_order(self):
"""Two pre-launched tasks for the same tool are consumed in FIFO order."""
call_order = []
async def execute_with_tag(*args, **kwargs):
tag = kwargs.get("block_id", "?")
call_order.append(tag)
return StreamToolOutputAvailable(
toolCallId="id", output=f"out-{tag}", toolName="run_block", success=True
)
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=execute_with_tag)
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "first"})
await pre_launch_tool_call("run_block", {"block_id": "second"})
await asyncio.sleep(0)
handler = create_tool_handler(mock_tool)
r1 = await handler({"block_id": "first"})
r2 = await handler({"block_id": "second"})
assert "out-first" in r1["content"][0]["text"]
assert "out-second" in r2["content"][0]["text"]
assert call_order == [
"first",
"second",
], f"Expected FIFO dispatch order but got {call_order}"
@pytest.mark.asyncio
async def test_arg_mismatch_falls_back_to_direct_execution(self):
"""When pre-launched args differ from SDK args, handler cancels pre-launched
task and falls back to direct execution with the correct args."""
mock_tool = _make_mock_tool("run_block", output="direct-result")
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
# Pre-launch with args {"block_id": "wrong"}
await pre_launch_tool_call("run_block", {"block_id": "wrong"})
await asyncio.sleep(0)
# SDK dispatches with different args
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "correct"})
assert result["isError"] is False
# The tool was called twice: once by pre-launch (wrong args), once by
# direct fallback (correct args). The result should come from the
# direct execution path.
assert mock_tool.execute.await_count == 2
@pytest.mark.asyncio
async def test_no_session_falls_back_gracefully(self):
"""When session is None and no pre-launched task, handler returns MCP error."""
mock_tool = _make_mock_tool("run_block")
# session=None means get_execution_context returns (user_id, None)
set_execution_context(user_id="u", session=None, sandbox=None) # type: ignore[arg-type]
handler = create_tool_handler(mock_tool)
@@ -555,220 +308,314 @@ class TestCreateToolHandlerParallel:
assert result["isError"] is True
assert "session" in result["content"][0]["text"].lower()
# ---------------------------------------------------------------------------
# cancel_pending_tool_tasks
# ---------------------------------------------------------------------------
class TestCancelPendingToolTasks:
"""Tests for cancel_pending_tool_tasks — the stream-abort cleanup helper."""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.asyncio
async def test_cancels_queued_tasks(self):
"""Queued tasks are cancelled and the queue is cleared."""
ran = False
async def never_run(*_args, **_kwargs):
nonlocal ran
await asyncio.sleep(10) # long enough to still be pending
ran = True
async def test_handler_returns_error_on_exception(self):
"""Exception from tool execution is caught and returned as MCP error."""
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=never_run)
mock_tool.execute = AsyncMock(side_effect=RuntimeError("block exploded"))
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0) # let task start
await cancel_pending_tool_tasks()
await asyncio.sleep(0) # let cancellation propagate
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "b1"})
assert not ran, "Task should have been cancelled before completing"
assert result["isError"] is True
assert "Failed to execute run_block" in result["content"][0]["text"]
@pytest.mark.asyncio
async def test_noop_when_no_tasks_queued(self):
"""cancel_pending_tool_tasks does not raise when queues are empty."""
await cancel_pending_tool_tasks() # should not raise
async def test_handler_executes_once_per_call(self):
"""Each handler call executes the tool exactly once — no duplicate execution."""
mock_tool = _make_mock_tool("run_block", output="single-execution")
@pytest.mark.asyncio
async def test_handler_does_not_find_cancelled_task(self):
"""After cancel, tool_handler falls back to direct execution."""
mock_tool = _make_mock_tool("run_block", output="direct-fallback")
handler = create_tool_handler(mock_tool)
await handler({"block_id": "b1"})
await handler({"block_id": "b2"})
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"run_block": mock_tool},
):
await pre_launch_tool_call("run_block", {"block_id": "b1"})
await asyncio.sleep(0)
await cancel_pending_tool_tasks()
# Queue is now empty — handler should execute directly
handler = create_tool_handler(mock_tool)
result = await handler({"block_id": "b1"})
assert result["isError"] is False
assert "direct-fallback" in result["content"][0]["text"]
# ---------------------------------------------------------------------------
# Concurrent / parallel pre-launch scenarios
# ---------------------------------------------------------------------------
class TestAllParallelToolsPrelaunchedIndependently:
"""Simulate SDK sending N separate AssistantMessages for the same tool concurrently."""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.asyncio
async def test_all_parallel_tools_prelaunched_independently(self):
"""5 pre-launches for the same tool all enqueue independently and run concurrently.
Each task sleeps for PER_TASK_S seconds. If they ran sequentially the total
wall time would be ~5*PER_TASK_S. Running concurrently it should finish in
roughly PER_TASK_S (plus scheduling overhead).
"""
PER_TASK_S = 0.05
N = 5
started: list[int] = []
finished: list[int] = []
async def slow_execute(*args, **kwargs):
idx = len(started)
started.append(idx)
await asyncio.sleep(PER_TASK_S)
finished.append(idx)
return StreamToolOutputAvailable(
toolCallId=f"id-{idx}",
output=f"result-{idx}",
toolName="bash_exec",
success=True,
)
mock_tool = _make_mock_tool("bash_exec")
mock_tool.execute = AsyncMock(side_effect=slow_execute)
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"bash_exec": mock_tool},
):
for i in range(N):
await pre_launch_tool_call("bash_exec", {"cmd": f"echo {i}"})
# Measure only the concurrent execution window, not pre-launch overhead.
# Starting the timer here avoids false failures on slow CI runners where
# the pre_launch_tool_call setup takes longer than the concurrent sleep.
t0 = asyncio.get_running_loop().time()
await asyncio.sleep(PER_TASK_S * 2)
elapsed = asyncio.get_running_loop().time() - t0
assert mock_tool.execute.await_count == N
assert len(finished) == N
# Wall time of the sleep window should be well under N * PER_TASK_S
# (sequential would be ~0.25s; concurrent finishes in ~PER_TASK_S = 0.05s)
assert elapsed < N * PER_TASK_S, (
f"Expected concurrent execution (<{N * PER_TASK_S:.2f}s) "
f"but sleep window took {elapsed:.2f}s"
)
class TestHandlerReturnsResultFromCorrectPrelaunchedTask:
"""Pop pre-launched tasks in order and verify each returns its own result."""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.asyncio
async def test_handler_returns_result_from_correct_prelaunched_task(self):
"""Two pre-launches for the same tool: first handler gets first result, second gets second."""
async def execute_with_cmd(*args, **kwargs):
cmd = kwargs.get("cmd", "?")
return StreamToolOutputAvailable(
toolCallId="id",
output=f"output-for-{cmd}",
toolName="bash_exec",
success=True,
)
mock_tool = _make_mock_tool("bash_exec")
mock_tool.execute = AsyncMock(side_effect=execute_with_cmd)
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"bash_exec": mock_tool},
):
await pre_launch_tool_call("bash_exec", {"cmd": "alpha"})
await pre_launch_tool_call("bash_exec", {"cmd": "beta"})
await asyncio.sleep(0) # let both tasks start
handler = create_tool_handler(mock_tool)
r1 = await handler({"cmd": "alpha"})
r2 = await handler({"cmd": "beta"})
text1 = r1["content"][0]["text"]
text2 = r2["content"][0]["text"]
assert "output-for-alpha" in text1, f"Expected alpha result, got: {text1}"
assert "output-for-beta" in text2, f"Expected beta result, got: {text2}"
assert mock_tool.execute.await_count == 2
class TestFiveConcurrentPrelaunchAllComplete:
"""Pre-launch 5 tasks; consume all 5 via handlers; assert all succeed."""
# ---------------------------------------------------------------------------
# Regression tests: bugs fixed by removing pre-launch mechanism
#
# Each test class includes a _buggy_handler fixture that reproduces the old
# pre-launch implementation inline. Tests run against BOTH the buggy handler
# (xfail — proves the bug exists) and the current clean handler (must pass).
# ---------------------------------------------------------------------------
def _make_execute_fn(tool_name: str = "run_block"):
"""Return (execute_fn, call_log) — execute_fn records every call."""
call_log: list[dict] = []
async def execute_fn(*args, **kwargs):
call_log.append(kwargs)
return StreamToolOutputAvailable(
toolCallId=f"id-{len(call_log)}",
output=f"result-{len(call_log)}",
toolName=tool_name,
success=True,
)
return execute_fn, call_log
async def _buggy_prelaunch_handler(mock_tool, pre_launch_args, dispatch_args):
"""Simulate the OLD buggy pre-launch flow.
1. pre_launch_tool_call fires _execute_tool_sync with pre_launch_args
2. SDK dispatches handler with dispatch_args
3. Handler compares args — on mismatch, cancels + re-executes (BUG)
Returns the handler result.
"""
from backend.copilot.sdk.tool_adapter import _execute_tool_sync
user_id, session = "user-1", _make_mock_session()
# Step 1: pre-launch fires immediately (speculative)
task = asyncio.create_task(
_execute_tool_sync(mock_tool, user_id, session, pre_launch_args)
)
await asyncio.sleep(0) # let task start
# Step 2: SDK dispatches with (potentially different) args
if pre_launch_args != dispatch_args:
# Arg mismatch path: cancel pre-launched task + re-execute
if not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
# Fall through to direct execution (duplicate!)
return await _execute_tool_sync(mock_tool, user_id, session, dispatch_args)
else:
return await task
class TestBug1DuplicateExecution:
"""Bug 1 (SECRT-2204): arg mismatch causes duplicate execution.
Pre-launch fires with raw args, SDK dispatches with normalised args.
Mismatch → cancel (too late) + re-execute → 2 API calls.
"""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.xfail(reason="Old pre-launch code causes duplicate execution")
@pytest.mark.asyncio
async def test_five_concurrent_prelaunch_all_complete(self):
"""All 5 pre-launched tasks complete and return successful results."""
N = 5
call_count = 0
async def test_old_code_duplicates_on_arg_mismatch(self):
"""OLD CODE: pre-launch with args A, dispatch with args B → 2 calls."""
execute_fn, call_log = _make_execute_fn()
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=execute_fn)
async def counting_execute(*args, **kwargs):
nonlocal call_count
call_count += 1
n = call_count
pre_launch_args = {"block_id": "b1", "input_data": {"title": "Test"}}
dispatch_args = {
"block_id": "b1",
"input_data": {"title": "Test", "priority": None},
}
await _buggy_prelaunch_handler(mock_tool, pre_launch_args, dispatch_args)
# BUG: pre-launch executed once + fallback executed again = 2
assert len(call_log) == 1, (
f"Expected 1 execution but got {len(call_log)}"
f"duplicate execution bug!"
)
@pytest.mark.asyncio
async def test_current_code_no_duplicate(self):
"""FIXED: handler executes exactly once regardless of arg shape."""
execute_fn, call_log = _make_execute_fn()
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=execute_fn)
handler = create_tool_handler(mock_tool)
await handler({"block_id": "b1", "input_data": {"title": "Test"}})
assert len(call_log) == 1, f"Expected 1 execution but got {len(call_log)}"
class TestBug2FIFODesync:
"""Bug 2: FIFO desync when security hook denies a tool.
Pre-launch queues [task_A, task_B]. Tool A denied (no MCP dispatch).
Tool B's handler dequeues task_A → returns wrong result.
"""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.xfail(reason="Old FIFO queue returns wrong result on denial")
@pytest.mark.asyncio
async def test_old_code_fifo_desync_on_denial(self):
"""OLD CODE: denied tool's task stays in queue, next tool gets wrong result."""
from backend.copilot.sdk.tool_adapter import _execute_tool_sync
call_log: list[str] = []
async def tagged_execute(*args, **kwargs):
tag = kwargs.get("block_id", "?")
call_log.append(tag)
return StreamToolOutputAvailable(
toolCallId=f"id-{n}",
output=f"done-{n}",
toolName="bash_exec",
toolCallId="id",
output=f"result-for-{tag}",
toolName="run_block",
success=True,
)
mock_tool = _make_mock_tool("bash_exec")
mock_tool.execute = AsyncMock(side_effect=counting_execute)
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=tagged_execute)
user_id, session = "user-1", _make_mock_session()
with patch(
"backend.copilot.sdk.tool_adapter.TOOL_REGISTRY",
{"bash_exec": mock_tool},
):
for i in range(N):
await pre_launch_tool_call("bash_exec", {"cmd": f"task-{i}"})
# Simulate old FIFO queue
queue: asyncio.Queue = asyncio.Queue()
await asyncio.sleep(0) # let all tasks start
# Pre-launch for tool A and tool B
task_a = asyncio.create_task(
_execute_tool_sync(mock_tool, user_id, session, {"block_id": "A"})
)
task_b = asyncio.create_task(
_execute_tool_sync(mock_tool, user_id, session, {"block_id": "B"})
)
queue.put_nowait(task_a)
queue.put_nowait(task_b)
await asyncio.sleep(0) # let both tasks run
handler = create_tool_handler(mock_tool)
results = []
for i in range(N):
results.append(await handler({"cmd": f"task-{i}"}))
# Tool A is DENIED by security hook — no MCP dispatch, no dequeue
# Tool B's handler dequeues from FIFO → gets task_A!
dequeued_task = queue.get_nowait()
result = await dequeued_task
result_text = result["content"][0]["text"]
assert (
mock_tool.execute.await_count == N
), f"Expected {N} execute calls, got {mock_tool.execute.await_count}"
for i, result in enumerate(results):
assert result["isError"] is False, f"Result {i} should not be an error"
text = result["content"][0]["text"]
assert "done-" in text, f"Result {i} missing expected output: {text}"
# BUG: handler for B got task_A's result
assert "result-for-B" in result_text, (
f"Expected result for B but got: {result_text}"
f"FIFO desync: B got A's result!"
)
@pytest.mark.asyncio
async def test_current_code_no_fifo_desync(self):
"""FIXED: each handler call executes independently, no shared queue."""
call_log: list[str] = []
async def tagged_execute(*args, **kwargs):
tag = kwargs.get("block_id", "?")
call_log.append(tag)
return StreamToolOutputAvailable(
toolCallId="id",
output=f"result-for-{tag}",
toolName="run_block",
success=True,
)
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=tagged_execute)
handler = create_tool_handler(mock_tool)
# Tool A denied (never called). Tool B dispatched normally.
result_b = await handler({"block_id": "B"})
assert "result-for-B" in result_b["content"][0]["text"]
assert call_log == ["B"]
class TestBug3CancelRace:
"""Bug 3: cancel race — task completes before cancel arrives.
Pre-launch fires fast HTTP call (< 1s). By the time handler detects
mismatch and calls task.cancel(), the API call already completed.
Side effect (Linear issue created) is irreversible.
"""
@pytest.fixture(autouse=True)
def _init(self):
_init_ctx(session=_make_mock_session())
@pytest.mark.xfail(reason="Old code: cancel arrives after task completes")
@pytest.mark.asyncio
async def test_old_code_cancel_arrives_too_late(self):
"""OLD CODE: fast task completes before cancel, side effect persists."""
side_effects: list[str] = []
async def fast_execute_with_side_effect(*args, **kwargs):
# Side effect happens immediately (like an HTTP POST to Linear)
side_effects.append("created-issue")
return StreamToolOutputAvailable(
toolCallId="id",
output="issue-created",
toolName="run_block",
success=True,
)
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=fast_execute_with_side_effect)
# Pre-launch fires immediately
pre_launch_args = {"block_id": "b1"}
dispatch_args = {"block_id": "b1", "extra": "normalised"}
await _buggy_prelaunch_handler(mock_tool, pre_launch_args, dispatch_args)
# BUG: side effect happened TWICE (pre-launch + fallback)
assert len(side_effects) == 1, (
f"Expected 1 side effect but got {len(side_effects)}"
f"cancel race: pre-launch completed before cancel!"
)
@pytest.mark.asyncio
async def test_current_code_single_side_effect(self):
"""FIXED: no speculative execution, exactly 1 side effect per call."""
side_effects: list[str] = []
async def execute_with_side_effect(*args, **kwargs):
side_effects.append("created-issue")
return StreamToolOutputAvailable(
toolCallId="id",
output="issue-created",
toolName="run_block",
success=True,
)
mock_tool = _make_mock_tool("run_block")
mock_tool.execute = AsyncMock(side_effect=execute_with_side_effect)
handler = create_tool_handler(mock_tool)
await handler({"block_id": "b1"})
assert len(side_effects) == 1
# ---------------------------------------------------------------------------
# readOnlyHint annotations
# ---------------------------------------------------------------------------
class TestReadOnlyAnnotations:
"""Tests that all tools get readOnlyHint=True for parallel dispatch."""
def test_parallel_annotation_constant(self):
"""_PARALLEL_ANNOTATION is a ToolAnnotations with readOnlyHint=True."""
from .tool_adapter import _PARALLEL_ANNOTATION
assert isinstance(_PARALLEL_ANNOTATION, ToolAnnotations)
assert _PARALLEL_ANNOTATION.readOnlyHint is True
# ---------------------------------------------------------------------------
# SDK_DISALLOWED_TOOLS
# ---------------------------------------------------------------------------
class TestSDKDisallowedTools:
"""Verify that dangerous SDK built-in tools are in the disallowed list."""
def test_bash_tool_is_disallowed(self):
assert "Bash" in SDK_DISALLOWED_TOOLS
def test_webfetch_tool_is_disallowed(self):
"""WebFetch is disallowed due to SSRF risk."""
assert "WebFetch" in SDK_DISALLOWED_TOOLS

View File

@@ -43,6 +43,10 @@ STRIPPABLE_TYPES = frozenset(
{"progress", "file-history-snapshot", "queue-operation", "summary", "pr-link"}
)
# Thinking block types that can be stripped from non-last assistant entries.
# The Anthropic API only requires these in the *last* assistant message.
_THINKING_BLOCK_TYPES = frozenset({"thinking", "redacted_thinking"})
@dataclass
class TranscriptDownload:
@@ -450,6 +454,83 @@ def _build_meta_storage_path(user_id: str, session_id: str, backend: object) ->
)
def strip_stale_thinking_blocks(content: str) -> str:
"""Remove thinking/redacted_thinking blocks from non-last assistant entries.
The Anthropic API only requires thinking blocks in the **last** assistant
message to be value-identical to the original response. Older assistant
entries carry stale thinking blocks that consume significant tokens
(often 10-50K each) without providing useful context for ``--resume``.
Stripping them before upload prevents the CLI from triggering compaction
every turn just to compress away the stale thinking bloat.
"""
lines = content.strip().split("\n")
if not lines:
return content
parsed: list[tuple[str, dict | None]] = []
for line in lines:
parsed.append((line, json.loads(line, fallback=None)))
# Reverse scan to find the last assistant message ID and index.
last_asst_msg_id: str | None = None
last_asst_idx: int | None = None
for i in range(len(parsed) - 1, -1, -1):
_line, entry = parsed[i]
if not isinstance(entry, dict):
continue
msg = entry.get("message", {})
if msg.get("role") == "assistant":
last_asst_msg_id = msg.get("id")
last_asst_idx = i
break
if last_asst_idx is None:
return content
result_lines: list[str] = []
stripped_count = 0
for i, (line, entry) in enumerate(parsed):
if not isinstance(entry, dict):
result_lines.append(line)
continue
msg = entry.get("message", {})
# Only strip from assistant entries that are NOT the last turn.
# Use msg_id matching when available; fall back to index for entries
# without an id field.
is_last_turn = (
last_asst_msg_id is not None and msg.get("id") == last_asst_msg_id
) or (last_asst_msg_id is None and i == last_asst_idx)
if (
msg.get("role") == "assistant"
and not is_last_turn
and isinstance(msg.get("content"), list)
):
content_blocks = msg["content"]
filtered = [
b
for b in content_blocks
if not (isinstance(b, dict) and b.get("type") in _THINKING_BLOCK_TYPES)
]
if len(filtered) < len(content_blocks):
stripped_count += len(content_blocks) - len(filtered)
entry = {**entry, "message": {**msg, "content": filtered}}
result_lines.append(json.dumps(entry, separators=(",", ":")))
continue
result_lines.append(line)
if stripped_count:
logger.info(
"[Transcript] Stripped %d stale thinking block(s) from non-last entries",
stripped_count,
)
return "\n".join(result_lines) + "\n"
async def upload_transcript(
user_id: str,
session_id: str,
@@ -472,6 +553,9 @@ async def upload_transcript(
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
# Strip stale thinking blocks from older assistant entries — these consume
# significant tokens and trigger unnecessary CLI compaction every turn.
stripped = strip_stale_thinking_blocks(stripped)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types = [
@@ -605,9 +689,6 @@ COMPACT_MSG_ID_PREFIX = "msg_compact_"
ENTRY_TYPE_MESSAGE = "message"
_THINKING_BLOCK_TYPES = frozenset({"thinking", "redacted_thinking"})
def _flatten_assistant_content(blocks: list) -> str:
"""Flatten assistant content blocks into a single plain-text string.
@@ -633,11 +714,14 @@ def _flatten_assistant_content(blocks: list) -> str:
if btype == "text":
parts.append(block.get("text", ""))
elif btype == "tool_use":
parts.append(f"[tool_use: {block.get('name', '?')}]")
# Drop tool_use entirely — any text representation gets
# mimicked by the model as plain text instead of actual
# structured tool calls. The tool results (in the
# following user/tool_result entry) provide sufficient
# context about what happened.
continue
else:
# Preserve non-text blocks (e.g. image) as placeholders.
# Use __prefix__ to distinguish from literal user text.
parts.append(f"[__{btype}__]")
continue
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts) if parts else ""

View File

@@ -13,6 +13,7 @@ from .transcript import (
delete_transcript,
read_compacted_entries,
strip_progress_entries,
strip_stale_thinking_blocks,
validate_transcript,
write_transcript_to_tempfile,
)
@@ -1200,3 +1201,170 @@ class TestCleanupStaleProjectDirs:
removed = cleanup_stale_project_dirs(encoded_cwd="some-other-project")
assert removed == 0
assert non_copilot.exists()
# ---------------------------------------------------------------------------
# strip_stale_thinking_blocks
# ---------------------------------------------------------------------------
class TestStripStaleThinkingBlocks:
"""Tests for strip_stale_thinking_blocks — removes thinking/redacted_thinking
blocks from non-last assistant entries to reduce transcript bloat."""
def _asst_entry(
self, msg_id: str, content: list, uuid: str = "u1", parent: str = ""
) -> dict:
return {
"type": "assistant",
"uuid": uuid,
"parentUuid": parent,
"message": {
"role": "assistant",
"id": msg_id,
"type": "message",
"content": content,
},
}
def _user_entry(self, text: str, uuid: str = "u0", parent: str = "") -> dict:
return {
"type": "user",
"uuid": uuid,
"parentUuid": parent,
"message": {"role": "user", "content": text},
}
def test_strips_thinking_from_older_assistant(self) -> None:
"""Thinking blocks in non-last assistant entries should be removed."""
old_asst = self._asst_entry(
"msg_old",
[
{"type": "thinking", "thinking": "deep thoughts..."},
{"type": "text", "text": "hello"},
{"type": "redacted_thinking", "data": "secret"},
],
uuid="a1",
)
new_asst = self._asst_entry(
"msg_new",
[
{"type": "thinking", "thinking": "latest thoughts"},
{"type": "text", "text": "world"},
],
uuid="a2",
parent="a1",
)
content = _make_jsonl(old_asst, new_asst)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
# Old assistant should have thinking blocks stripped
old_content = lines[0]["message"]["content"]
assert len(old_content) == 1
assert old_content[0]["type"] == "text"
# New (last) assistant should be untouched
new_content = lines[1]["message"]["content"]
assert len(new_content) == 2
assert new_content[0]["type"] == "thinking"
assert new_content[1]["type"] == "text"
def test_preserves_last_assistant_thinking(self) -> None:
"""The last assistant entry's thinking blocks must be preserved."""
entry = self._asst_entry(
"msg_only",
[
{"type": "thinking", "thinking": "must keep"},
{"type": "text", "text": "response"},
],
)
content = _make_jsonl(entry)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
assert len(lines[0]["message"]["content"]) == 2
def test_no_assistant_entries_returns_unchanged(self) -> None:
"""Transcripts with only user entries should pass through unchanged."""
user = self._user_entry("hello")
content = _make_jsonl(user)
assert strip_stale_thinking_blocks(content) == content
def test_empty_content_returns_unchanged(self) -> None:
assert strip_stale_thinking_blocks("") == ""
def test_multiple_turns_strips_all_but_last(self) -> None:
"""With 3 assistant turns, only the last keeps thinking blocks."""
entries = [
self._asst_entry(
"msg_1",
[
{"type": "thinking", "thinking": "t1"},
{"type": "text", "text": "a1"},
],
uuid="a1",
),
self._user_entry("q2", uuid="u2", parent="a1"),
self._asst_entry(
"msg_2",
[
{"type": "thinking", "thinking": "t2"},
{"type": "text", "text": "a2"},
],
uuid="a2",
parent="u2",
),
self._user_entry("q3", uuid="u3", parent="a2"),
self._asst_entry(
"msg_3",
[
{"type": "thinking", "thinking": "t3"},
{"type": "text", "text": "a3"},
],
uuid="a3",
parent="u3",
),
]
content = _make_jsonl(*entries)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
# msg_1: thinking stripped
assert len(lines[0]["message"]["content"]) == 1
assert lines[0]["message"]["content"][0]["type"] == "text"
# msg_2: thinking stripped
assert len(lines[2]["message"]["content"]) == 1
# msg_3 (last): thinking preserved
assert len(lines[4]["message"]["content"]) == 2
assert lines[4]["message"]["content"][0]["type"] == "thinking"
def test_same_msg_id_multi_entry_turn(self) -> None:
"""Multiple entries sharing the same message.id (same turn) are preserved."""
entries = [
self._asst_entry(
"msg_old",
[{"type": "thinking", "thinking": "old"}],
uuid="a1",
),
self._asst_entry(
"msg_last",
[{"type": "thinking", "thinking": "t_part1"}],
uuid="a2",
parent="a1",
),
self._asst_entry(
"msg_last",
[{"type": "text", "text": "response"}],
uuid="a3",
parent="a2",
),
]
content = _make_jsonl(*entries)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
# Old entry stripped
assert lines[0]["message"]["content"] == []
# Both entries of last turn (msg_last) preserved
assert lines[1]["message"]["content"][0]["type"] == "thinking"
assert lines[2]["message"]["content"][0]["type"] == "text"

View File

@@ -30,7 +30,7 @@ async def test_sdk_resume_multi_turn(setup_test_user, test_user_id):
if not cfg.claude_agent_use_resume:
return pytest.skip("CLAUDE_AGENT_USE_RESUME is not enabled, skipping test")
session = await create_chat_session(test_user_id)
session = await create_chat_session(test_user_id, dry_run=False)
session = await upsert_chat_session(session)
# --- Turn 1: send a message with a unique keyword ---

View File

@@ -221,9 +221,21 @@ async def create_session(
return session
_meta_ttl_refresh_at: dict[str, float] = {}
"""Tracks the last time the session meta key TTL was refreshed.
Used by `publish_chunk` to avoid refreshing on every single chunk
(expensive). Refreshes at most once every 60 seconds per session.
"""
_META_TTL_REFRESH_INTERVAL = 60 # seconds
async def publish_chunk(
turn_id: str,
chunk: StreamBaseResponse,
*,
session_id: str | None = None,
) -> str:
"""Publish a chunk to Redis Stream.
@@ -232,6 +244,9 @@ async def publish_chunk(
Args:
turn_id: Turn ID (per-turn UUID) identifying the stream
chunk: The stream response chunk to publish
session_id: Chat session ID — when provided, the session meta key
TTL is refreshed periodically to prevent expiration during
long-running turns (see SECRT-2178).
Returns:
The Redis Stream message ID
@@ -265,6 +280,23 @@ async def publish_chunk(
# Set TTL on stream to match session metadata TTL
await redis.expire(stream_key, config.stream_ttl)
# Periodically refresh session-related TTLs so they don't expire
# during long-running turns. Without this, turns exceeding stream_ttl
# (default 1h) lose their "running" status and stream data, making
# the session invisible to the resume endpoint (empty on page reload).
# Both meta key AND stream key are refreshed: the stream key's expire
# above only fires when publish_chunk is called, but during long
# sub-agent gaps (task_progress events don't produce chunks), neither
# key gets refreshed.
if session_id:
now = time.perf_counter()
last_refresh = _meta_ttl_refresh_at.get(session_id, 0)
if now - last_refresh >= _META_TTL_REFRESH_INTERVAL:
meta_key = _get_session_meta_key(session_id)
await redis.expire(meta_key, config.stream_ttl)
await redis.expire(stream_key, config.stream_ttl)
_meta_ttl_refresh_at[session_id] = now
total_time = (time.perf_counter() - start_time) * 1000
# Only log timing for significant chunks or slow operations
if (
@@ -331,7 +363,7 @@ async def stream_and_publish(
async for event in stream:
if turn_id and not isinstance(event, (StreamFinish, StreamError)):
try:
await publish_chunk(turn_id, event)
await publish_chunk(turn_id, event, session_id=session_id)
except (RedisError, ConnectionError, OSError):
if not publish_failed_once:
publish_failed_once = True
@@ -800,6 +832,9 @@ async def mark_session_completed(
# Atomic compare-and-swap: only update if status is "running"
result = await redis.eval(COMPLETE_SESSION_SCRIPT, 1, meta_key, status) # type: ignore[misc]
# Clean up the in-memory TTL refresh tracker to prevent unbounded growth.
_meta_ttl_refresh_at.pop(session_id, None)
if result == 0:
logger.debug(f"Session {session_id} already completed/failed, skipping")
return False

View File

@@ -68,6 +68,9 @@ class AddUnderstandingTool(BaseTool):
Each call merges new data with existing understanding:
- String fields are overwritten if provided
- List fields are appended (with deduplication)
Note: This tool accepts **kwargs because its parameters are derived
dynamically from the BusinessUnderstandingInput model schema.
"""
session_id = session.session_id
@@ -77,23 +80,21 @@ class AddUnderstandingTool(BaseTool):
session_id=session_id,
)
# Build input model from kwargs (only include fields defined in the model)
valid_fields = set(BusinessUnderstandingInput.model_fields.keys())
filtered = {k: v for k, v in kwargs.items() if k in valid_fields}
# Check if any data was provided
if not any(v is not None for v in kwargs.values()):
if not any(v is not None for v in filtered.values()):
return ErrorResponse(
message="Please provide at least one field to update.",
session_id=session_id,
)
# Build input model from kwargs (only include fields defined in the model)
valid_fields = set(BusinessUnderstandingInput.model_fields.keys())
input_data = BusinessUnderstandingInput(
**{k: v for k, v in kwargs.items() if k in valid_fields}
)
input_data = BusinessUnderstandingInput(**filtered)
# Track which fields were updated
updated_fields = [
k for k, v in kwargs.items() if k in valid_fields and v is not None
]
updated_fields = [k for k, v in filtered.items() if v is not None]
# Upsert with merge
understanding = await understanding_db().upsert_business_understanding(

View File

@@ -180,12 +180,14 @@ async def _save_browser_state(
"""
try:
# Gather state in parallel
(rc_url, url_out, _), (rc_ck, ck_out, _), (rc_ls, ls_out, _) = (
await asyncio.gather(
_run(session_name, "get", "url", timeout=10),
_run(session_name, "cookies", "get", "--json", timeout=10),
_run(session_name, "storage", "local", "--json", timeout=10),
)
(
(rc_url, url_out, _),
(rc_ck, ck_out, _),
(rc_ls, ls_out, _),
) = await asyncio.gather(
_run(session_name, "get", "url", timeout=10),
_run(session_name, "cookies", "get", "--json", timeout=10),
_run(session_name, "storage", "local", "--json", timeout=10),
)
state = {
@@ -448,6 +450,8 @@ class BrowserNavigateTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
url: str = "",
wait_for: str = "networkidle",
**kwargs: Any,
) -> ToolResponseBase:
"""Navigate to *url*, wait for the page to settle, and return a snapshot.
@@ -456,8 +460,8 @@ class BrowserNavigateTool(BaseTool):
Note: for slow SPAs that never fully idle, the snapshot may reflect a
partially-loaded state (the wait is best-effort).
"""
url: str = (kwargs.get("url") or "").strip()
wait_for: str = kwargs.get("wait_for") or "networkidle"
url = url.strip()
wait_for = wait_for or "networkidle"
session_name = session.session_id
if not url:
@@ -612,6 +616,10 @@ class BrowserActTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
action: str = "",
target: str = "",
value: str = "",
direction: str = "down",
**kwargs: Any,
) -> ToolResponseBase:
"""Perform a browser action and return an updated page snapshot.
@@ -620,10 +628,10 @@ class BrowserActTool(BaseTool):
``agent-browser``, waits for the page to settle, and returns the
accessibility-tree snapshot so the LLM can plan the next step.
"""
action: str = (kwargs.get("action") or "").strip()
target: str = (kwargs.get("target") or "").strip()
value: str = (kwargs.get("value") or "").strip()
direction: str = (kwargs.get("direction") or "down").strip()
action = action.strip()
target = target.strip()
value = value.strip()
direction = direction.strip()
session_name = session.session_id
if not action:
@@ -777,6 +785,8 @@ class BrowserScreenshotTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
annotate: bool | str = True,
filename: str = "screenshot.png",
**kwargs: Any,
) -> ToolResponseBase:
"""Capture a PNG screenshot and upload it to the workspace.
@@ -786,12 +796,12 @@ class BrowserScreenshotTool(BaseTool):
Returns a :class:`BrowserScreenshotResponse` with the workspace
``file_id`` the LLM should pass to ``read_workspace_file``.
"""
raw_annotate = kwargs.get("annotate", True)
raw_annotate = annotate
if isinstance(raw_annotate, str):
annotate = raw_annotate.strip().lower() in {"1", "true", "yes", "on"}
else:
annotate = bool(raw_annotate)
filename: str = (kwargs.get("filename") or "screenshot.png").strip()
filename = filename.strip()
session_name = session.session_id
# Restore browser state from cloud if this is a different pod

View File

@@ -411,7 +411,12 @@ class AgentOutputTool(BaseTool):
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute the agent_output tool."""
"""Execute the agent_output tool.
Note: This tool accepts **kwargs and delegates to AgentOutputInput
for validation because the parameter set has cross-field validators
defined in the Pydantic model.
"""
session_id = session.session_id
# Parse and validate input

View File

@@ -76,6 +76,8 @@ class BashExecTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
command: str = "",
timeout: int = 30,
**kwargs: Any,
) -> ToolResponseBase:
"""Run a bash command on E2B (if available) or in a bubblewrap sandbox.
@@ -88,8 +90,8 @@ class BashExecTool(BaseTool):
"""
session_id = session.session_id if session else None
command: str = (kwargs.get("command") or "").strip()
timeout: int = int(kwargs.get("timeout", 30))
command = command.strip()
timeout = int(timeout)
if not command:
return ErrorResponse(

View File

@@ -115,6 +115,9 @@ class ConnectIntegrationTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
provider: str = "",
reason: str = "",
scopes: list[str] | None = None,
**kwargs: Any,
) -> ToolResponseBase:
"""Build and return a :class:`SetupRequirementsResponse` for the requested provider.
@@ -128,12 +131,10 @@ class ConnectIntegrationTool(BaseTool):
"""
_ = user_id # setup card is user-agnostic; auth is enforced via requires_auth
session_id = session.session_id if session else None
provider: str = (kwargs.get("provider") or "").strip().lower()
reason: str = (kwargs.get("reason") or "").strip()[
:500
] # cap LLM-controlled text
provider = (provider or "").strip().lower()
reason = (reason or "").strip()[:500] # cap LLM-controlled text
extra_scopes: list[str] = [
str(s).strip() for s in (kwargs.get("scopes") or []) if str(s).strip()
str(s).strip() for s in (scopes or []) if str(s).strip()
]
entry = SUPPORTED_PROVIDERS.get(provider)
@@ -141,8 +142,7 @@ class ConnectIntegrationTool(BaseTool):
supported = ", ".join(f"'{p}'" for p in SUPPORTED_PROVIDERS)
return ErrorResponse(
message=(
f"Unknown provider '{provider}'. "
f"Supported providers: {supported}."
f"Unknown provider '{provider}'. Supported providers: {supported}."
),
error="unknown_provider",
session_id=session_id,
@@ -153,11 +153,11 @@ class ConnectIntegrationTool(BaseTool):
# Merge agent-requested scopes with provider defaults (deduplicated, order preserved).
default_scopes: list[str] = entry["default_scopes"]
seen: set[str] = set()
scopes: list[str] = []
merged_scopes: list[str] = []
for s in default_scopes + extra_scopes:
if s not in seen:
seen.add(s)
scopes.append(s)
merged_scopes.append(s)
field_key = f"{provider}_credentials"
message_parts = [
@@ -171,7 +171,7 @@ class ConnectIntegrationTool(BaseTool):
"title": f"{display_name} Credentials",
"provider": provider,
"types": supported_types,
"scopes": scopes,
"scopes": merged_scopes,
}
missing_credentials: dict[str, _CredentialEntry] = {field_key: credential_entry}

View File

@@ -53,11 +53,10 @@ class ContinueRunBlockTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
review_id: str = "",
**kwargs,
) -> ToolResponseBase:
review_id = (
kwargs.get("review_id", "").strip() if kwargs.get("review_id") else ""
)
review_id = review_id.strip() if review_id else ""
session_id = session.session_id
if not review_id:

View File

@@ -62,9 +62,12 @@ class CreateAgentTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
agent_json: dict[str, Any] | None = None,
save: bool = True,
library_agent_ids: list[str] | None = None,
folder_id: str | None = None,
**kwargs,
) -> ToolResponseBase:
agent_json: dict[str, Any] | None = kwargs.get("agent_json")
session_id = session.session_id if session else None
if not agent_json:
@@ -77,9 +80,8 @@ class CreateAgentTool(BaseTool):
session_id=session_id,
)
save = kwargs.get("save", True)
library_agent_ids = kwargs.get("library_agent_ids", [])
folder_id: str | None = kwargs.get("folder_id")
if library_agent_ids is None:
library_agent_ids = []
nodes = agent_json.get("nodes", [])
if not nodes:

View File

@@ -61,9 +61,12 @@ class CustomizeAgentTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
agent_json: dict[str, Any] | None = None,
save: bool = True,
library_agent_ids: list[str] | None = None,
folder_id: str | None = None,
**kwargs,
) -> ToolResponseBase:
agent_json: dict[str, Any] | None = kwargs.get("agent_json")
session_id = session.session_id if session else None
if not agent_json:
@@ -75,9 +78,8 @@ class CustomizeAgentTool(BaseTool):
session_id=session_id,
)
save = kwargs.get("save", True)
library_agent_ids = kwargs.get("library_agent_ids", [])
folder_id: str | None = kwargs.get("folder_id")
if library_agent_ids is None:
library_agent_ids = []
nodes = agent_json.get("nodes", [])
if not nodes:

View File

@@ -62,10 +62,15 @@ class EditAgentTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
agent_id: str = "",
agent_json: dict[str, Any] | None = None,
save: bool = True,
library_agent_ids: list[str] | None = None,
**kwargs,
) -> ToolResponseBase:
agent_id = kwargs.get("agent_id", "").strip()
agent_json: dict[str, Any] | None = kwargs.get("agent_json")
agent_id = agent_id.strip()
if library_agent_ids is None:
library_agent_ids = []
session_id = session.session_id if session else None
if not agent_id:
@@ -84,9 +89,6 @@ class EditAgentTool(BaseTool):
session_id=session_id,
)
save = kwargs.get("save", True)
library_agent_ids = kwargs.get("library_agent_ids", [])
nodes = agent_json.get("nodes", [])
if not nodes:
return ErrorResponse(

View File

@@ -157,9 +157,10 @@ class SearchFeatureRequestsTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
query: str = "",
**kwargs,
) -> ToolResponseBase:
query = kwargs.get("query", "").strip()
query = (query or "").strip()
session_id = session.session_id if session else None
if not query:
@@ -288,11 +289,13 @@ class CreateFeatureRequestTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
title: str = "",
description: str = "",
existing_issue_id: str | None = None,
**kwargs,
) -> ToolResponseBase:
title = kwargs.get("title", "").strip()
description = kwargs.get("description", "").strip()
existing_issue_id = kwargs.get("existing_issue_id")
title = (title or "").strip()
description = (description or "").strip()
session_id = session.session_id if session else None
if not title or not description:

View File

@@ -34,11 +34,15 @@ class FindAgentTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
query: str = "",
**kwargs,
) -> ToolResponseBase:
"""Search marketplace for agents matching the query."""
return await search_agents(
query=kwargs.get("query", "").strip(),
query=query.strip(),
source="marketplace",
session_id=session.session_id,
user_id=user_id,

View File

@@ -86,6 +86,8 @@ class FindBlockTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
query: str = "",
include_schemas: bool = False,
**kwargs,
) -> ToolResponseBase:
"""Search for blocks matching the query.
@@ -94,14 +96,14 @@ class FindBlockTool(BaseTool):
user_id: User ID (required)
session: Chat session
query: Search query
include_schemas: Whether to include block schemas in results
Returns:
BlockListResponse: List of matching blocks
NoResultsResponse: No blocks found
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
include_schemas = kwargs.get("include_schemas", False)
query = (query or "").strip()
session_id = session.session_id
if not query:

View File

@@ -41,10 +41,14 @@ class FindLibraryAgentTool(BaseTool):
return True
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
query: str = "",
**kwargs,
) -> ToolResponseBase:
return await search_agents(
query=(kwargs.get("query") or "").strip(),
query=query.strip(),
source="library",
session_id=session.session_id,
user_id=user_id,

View File

@@ -51,9 +51,9 @@ class FixAgentGraphTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
agent_json: dict | None = None,
**kwargs,
) -> ToolResponseBase:
agent_json = kwargs.get("agent_json")
session_id = session.session_id if session else None
if not agent_json or not isinstance(agent_json, dict):
@@ -98,8 +98,7 @@ class FixAgentGraphTool(BaseTool):
if is_valid:
return FixResultResponse(
message=(
f"Applied {len(fixes_applied)} fix(es). "
"Agent graph is now valid!"
f"Applied {len(fixes_applied)} fix(es). Agent graph is now valid!"
),
fixed_agent_json=fixed_agent,
fixes_applied=fixes_applied,

View File

@@ -60,7 +60,7 @@ class GetAgentBuildingGuideTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
**kwargs,
**kwargs, # no tool-specific params; accepts kwargs for forward-compat
) -> ToolResponseBase:
session_id = session.session_id if session else None
try:

View File

@@ -68,6 +68,7 @@ class GetDocPageTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
path: str = "",
**kwargs,
) -> ToolResponseBase:
"""Fetch full content of a documentation page.
@@ -81,7 +82,7 @@ class GetDocPageTool(BaseTool):
DocPageResponse: Full document content
ErrorResponse: Error message
"""
path = kwargs.get("path", "").strip()
path = path.strip()
session_id = session.session_id if session else None
if not path:

View File

@@ -56,7 +56,7 @@ class GetMCPGuideTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
**kwargs,
**kwargs, # no tool-specific params; accepts kwargs for forward-compat
) -> ToolResponseBase:
session_id = session.session_id if session else None
try:

View File

@@ -81,7 +81,7 @@ async def execute_block(
node_exec_id: str,
matched_credentials: dict[str, CredentialsMetaInput],
sensitive_action_safe_mode: bool = False,
dry_run: bool = False,
dry_run: bool,
) -> ToolResponseBase:
"""Execute a block with full context setup, credential injection, and error handling.
@@ -114,11 +114,9 @@ async def execute_block(
error=sim_error[0],
session_id=session_id,
)
return BlockOutputResponse(
message=(
f"[DRY RUN] Block '{block.name}' simulated successfully "
"— no real execution occurred."
),
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
@@ -337,7 +335,7 @@ async def prepare_block_for_execution(
user_id: str,
session: ChatSession,
session_id: str,
dry_run: bool = False,
dry_run: bool,
) -> "BlockPreparation | ToolResponseBase":
"""Validate and prepare a block for execution.

View File

@@ -102,6 +102,7 @@ class TestExecuteBlockCreditCharging:
session_id=_SESSION,
node_exec_id="exec-1",
matched_credentials={},
dry_run=False,
)
assert isinstance(result, BlockOutputResponse)
@@ -132,6 +133,7 @@ class TestExecuteBlockCreditCharging:
session_id=_SESSION,
node_exec_id="exec-1",
matched_credentials={},
dry_run=False,
)
assert isinstance(result, ErrorResponse)
@@ -158,6 +160,7 @@ class TestExecuteBlockCreditCharging:
session_id=_SESSION,
node_exec_id="exec-1",
matched_credentials={},
dry_run=False,
)
assert isinstance(result, BlockOutputResponse)
@@ -194,6 +197,7 @@ class TestExecuteBlockCreditCharging:
session_id=_SESSION,
node_exec_id="exec-1",
matched_credentials={},
dry_run=False,
)
# Block already executed (with side effects), so output is returned
@@ -277,6 +281,7 @@ async def test_coerce_json_string_to_nested_list():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-1",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -317,6 +322,7 @@ async def test_coerce_json_string_to_list():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-2",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -349,6 +355,7 @@ async def test_coerce_json_string_to_dict():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-3",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -382,6 +389,7 @@ async def test_no_coercion_when_type_matches():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-4",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -415,6 +423,7 @@ async def test_coerce_string_to_int():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-5",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -448,6 +457,7 @@ async def test_coerce_skips_none_values():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-6",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -481,6 +491,7 @@ async def test_coerce_union_type_preserves_valid_member():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-7",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -516,6 +527,7 @@ async def test_coerce_inner_elements_of_generic():
session_id=_TEST_SESSION_ID,
node_exec_id="exec-8",
matched_credentials={},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -592,6 +604,7 @@ async def test_prepare_block_not_found() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, ErrorResponse)
assert "not found" in result.message
@@ -612,6 +625,7 @@ async def test_prepare_block_disabled() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, ErrorResponse)
assert "disabled" in result.message
@@ -640,6 +654,7 @@ async def test_prepare_block_unrecognized_fields() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, InputValidationErrorResponse)
assert "unknown_field" in result.unrecognized_fields
@@ -669,6 +684,7 @@ async def test_prepare_block_missing_credentials() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, SetupRequirementsResponse)
@@ -698,6 +714,7 @@ async def test_prepare_block_success_returns_preparation() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, BlockPreparation)
assert result.required_non_credential_keys == {"text"}
@@ -802,6 +819,7 @@ async def test_prepare_block_excluded_by_type() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, ErrorResponse)
assert "cannot be run directly" in result.message
@@ -824,6 +842,7 @@ async def test_prepare_block_excluded_by_id() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, ErrorResponse)
assert "cannot be run directly" in result.message
@@ -857,6 +876,7 @@ async def test_prepare_block_file_ref_expansion_error() -> None:
user_id=_PREP_USER,
session=_make_prep_session(),
session_id=_PREP_SESSION,
dry_run=False,
)
assert isinstance(result, ErrorResponse)
assert "file reference" in result.message.lower()

View File

@@ -866,6 +866,7 @@ class TestRunBlockToolAuthenticatedHttp:
session=session,
block_id=block.id,
input_data={"url": "https://api.example.com/data", "method": "GET"},
dry_run=False,
)
assert isinstance(response, SetupRequirementsResponse)
@@ -907,6 +908,7 @@ class TestRunBlockToolAuthenticatedHttp:
session=session,
block_id=block.id,
input_data={},
dry_run=False,
)
assert isinstance(response, BlockDetailsResponse)

View File

@@ -120,14 +120,18 @@ class CreateFolderTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
name: str = "",
parent_id: str | None = None,
icon: str | None = None,
color: str | None = None,
**kwargs,
) -> ToolResponseBase:
"""Create a folder with the given name and optional parent/icon/color."""
assert user_id is not None # guaranteed by requires_auth
name = (kwargs.get("name") or "").strip()
parent_id = kwargs.get("parent_id")
icon = kwargs.get("icon")
color = kwargs.get("color")
name = (name or "").strip()
session_id = session.session_id if session else None
if not name:
@@ -196,12 +200,15 @@ class ListFoldersTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
parent_id: str | None = None,
include_agents: bool = False,
**kwargs,
) -> ToolResponseBase:
"""List folders as a flat list (by parent) or full tree."""
assert user_id is not None # guaranteed by requires_auth
parent_id = kwargs.get("parent_id")
include_agents = kwargs.get("include_agents", False)
session_id = session.session_id if session else None
try:
@@ -293,14 +300,18 @@ class UpdateFolderTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
folder_id: str = "",
name: str | None = None,
icon: str | None = None,
color: str | None = None,
**kwargs,
) -> ToolResponseBase:
"""Update a folder's name, icon, or color."""
assert user_id is not None # guaranteed by requires_auth
folder_id = (kwargs.get("folder_id") or "").strip()
name = kwargs.get("name")
icon = kwargs.get("icon")
color = kwargs.get("color")
folder_id = (folder_id or "").strip()
session_id = session.session_id if session else None
if not folder_id:
@@ -365,12 +376,16 @@ class MoveFolderTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
folder_id: str = "",
target_parent_id: str | None = None,
**kwargs,
) -> ToolResponseBase:
"""Move a folder to a new parent or to root level."""
assert user_id is not None # guaranteed by requires_auth
folder_id = (kwargs.get("folder_id") or "").strip()
target_parent_id = kwargs.get("target_parent_id")
folder_id = (folder_id or "").strip()
session_id = session.session_id if session else None
if not folder_id:
@@ -431,11 +446,15 @@ class DeleteFolderTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
folder_id: str = "",
**kwargs,
) -> ToolResponseBase:
"""Soft-delete a folder; agents inside are moved to root level."""
assert user_id is not None # guaranteed by requires_auth
folder_id = (kwargs.get("folder_id") or "").strip()
folder_id = (folder_id or "").strip()
session_id = session.session_id if session else None
if not folder_id:
@@ -499,12 +518,17 @@ class MoveAgentsToFolderTool(BaseTool):
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
self,
user_id: str | None,
session: ChatSession,
agent_ids: list[str] | None = None,
folder_id: str | None = None,
**kwargs,
) -> ToolResponseBase:
"""Move one or more agents to a folder or to root level."""
assert user_id is not None # guaranteed by requires_auth
agent_ids = kwargs.get("agent_ids", [])
folder_id = kwargs.get("folder_id")
if agent_ids is None:
agent_ids = []
session_id = session.session_id if session else None
if not agent_ids:

View File

@@ -71,7 +71,7 @@ class RunAgentInput(BaseModel):
cron: str = ""
timezone: str = "UTC"
wait_for_result: int = Field(default=0, ge=0, le=300)
dry_run: bool = False
dry_run: bool
@field_validator(
"username_agent_slug",
@@ -153,14 +153,10 @@ class RunAgentTool(BaseTool):
},
"dry_run": {
"type": "boolean",
"description": (
"When true, simulates the entire agent execution using an LLM "
"for each block — no real API calls, no credentials needed, "
"no credits charged. Useful for testing agent wiring end-to-end."
),
"description": "Execute in preview mode.",
},
},
"required": [],
"required": ["dry_run"],
}
@property
@@ -174,8 +170,16 @@ class RunAgentTool(BaseTool):
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute the tool with automatic state detection."""
"""Execute the tool with automatic state detection.
Note: This tool accepts **kwargs and delegates to RunAgentInput for
validation because the parameter set is complex with cross-field
validators defined in the Pydantic model.
"""
params = RunAgentInput(**kwargs)
# Session-level dry_run forces all tool calls to use dry-run mode.
if session.dry_run:
params.dry_run = True
session_id = session.session_id
# Validate at least one identifier is provided
@@ -201,6 +205,18 @@ class RunAgentTool(BaseTool):
# Determine if this is a schedule request
is_schedule = bool(params.schedule_name or params.cron)
# Session-level dry-run blocks scheduling — schedules create real
# side effects that cannot be simulated.
if params.dry_run and is_schedule:
return ErrorResponse(
message=(
"Scheduling is disabled in dry-run mode because it creates "
"real side effects. Remove cron/schedule_name to simulate "
"a run, or disable dry-run to create a real schedule."
),
session_id=session_id,
)
try:
# Step 1: Fetch agent details
graph: GraphModel | None = None
@@ -458,8 +474,8 @@ class RunAgentTool(BaseTool):
graph: GraphModel,
graph_credentials: dict[str, CredentialsMetaInput],
inputs: dict[str, Any],
dry_run: bool,
wait_for_result: int = 0,
dry_run: bool = False,
) -> ToolResponseBase:
"""Execute an agent immediately, optionally waiting for completion."""
session_id = session.session_id

View File

@@ -53,6 +53,7 @@ async def test_run_agent(setup_test_data):
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"test_input": "Hello World"},
dry_run=False,
session=session,
)
@@ -93,6 +94,7 @@ async def test_run_agent_missing_inputs(setup_test_data):
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={}, # Missing required input
dry_run=False,
session=session,
)
@@ -125,6 +127,7 @@ async def test_run_agent_invalid_agent_id(setup_test_data):
tool_call_id=str(uuid.uuid4()),
username_agent_slug="invalid/agent-id",
inputs={"test_input": "Hello World"},
dry_run=False,
session=session,
)
@@ -165,6 +168,7 @@ async def test_run_agent_with_llm_credentials(setup_llm_test_data):
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"user_prompt": "What is 2+2?"},
dry_run=False,
session=session,
)
@@ -203,6 +207,7 @@ async def test_run_agent_shows_available_inputs_when_none_provided(setup_test_da
username_agent_slug=agent_marketplace_id,
inputs={},
use_defaults=False,
dry_run=False,
session=session,
)
@@ -238,6 +243,7 @@ async def test_run_agent_with_use_defaults(setup_test_data):
username_agent_slug=agent_marketplace_id,
inputs={},
use_defaults=True,
dry_run=False,
session=session,
)
@@ -268,6 +274,7 @@ async def test_run_agent_missing_credentials(setup_firecrawl_test_data):
tool_call_id=str(uuid.uuid4()),
username_agent_slug=agent_marketplace_id,
inputs={"url": "https://example.com"},
dry_run=False,
session=session,
)
@@ -300,6 +307,7 @@ async def test_run_agent_invalid_slug_format(setup_test_data):
tool_call_id=str(uuid.uuid4()),
username_agent_slug="no-slash-here",
inputs={},
dry_run=False,
session=session,
)
@@ -327,6 +335,7 @@ async def test_run_agent_unauthenticated():
tool_call_id=str(uuid.uuid4()),
username_agent_slug="test/test-agent",
inputs={},
dry_run=False,
session=session,
)
@@ -359,6 +368,7 @@ async def test_run_agent_schedule_without_cron(setup_test_data):
inputs={"test_input": "test"},
schedule_name="My Schedule",
cron="", # Empty cron
dry_run=False,
session=session,
)
@@ -391,6 +401,7 @@ async def test_run_agent_schedule_without_name(setup_test_data):
inputs={"test_input": "test"},
schedule_name="", # Empty name
cron="0 9 * * *",
dry_run=False,
session=session,
)
@@ -424,6 +435,7 @@ async def test_run_agent_rejects_unknown_input_fields(setup_test_data):
"unknown_field": "some value",
"another_unknown": "another value",
},
dry_run=False,
session=session,
)

View File

@@ -51,14 +51,10 @@ class RunBlockTool(BaseTool):
},
"dry_run": {
"type": "boolean",
"description": (
"When true, simulates block execution using an LLM without making any "
"real API calls or producing side effects. Useful for testing agent "
"wiring and previewing outputs. Default: false."
),
"description": "Execute in preview mode.",
},
},
"required": ["block_id", "input_data"],
"required": ["block_id", "input_data", "dry_run"],
}
@property
@@ -69,6 +65,10 @@ class RunBlockTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
*,
block_id: str = "",
input_data: dict | None = None,
dry_run: bool,
**kwargs,
) -> ToolResponseBase:
"""Execute a block with the given input data.
@@ -78,15 +78,19 @@ class RunBlockTool(BaseTool):
session: Chat session
block_id: Block UUID to execute
input_data: Input values for the block
dry_run: If True, simulate execution without side effects
Returns:
BlockOutputResponse: Block execution outputs
SetupRequirementsResponse: Missing credentials
ErrorResponse: Error message
"""
block_id = kwargs.get("block_id", "").strip()
input_data = kwargs.get("input_data", {})
dry_run = bool(kwargs.get("dry_run", False))
block_id = block_id.strip()
if input_data is None:
input_data = {}
# Session-level dry_run forces all tool calls to use dry-run mode.
if session.dry_run:
dry_run = True
session_id = session.session_id
if not block_id:

View File

@@ -103,6 +103,7 @@ class TestRunBlockFiltering:
session=session,
block_id="input-block-id",
input_data={},
dry_run=False,
)
assert isinstance(response, ErrorResponse)
@@ -129,6 +130,7 @@ class TestRunBlockFiltering:
session=session,
block_id=orchestrator_id,
input_data={},
dry_run=False,
)
assert isinstance(response, ErrorResponse)
@@ -154,6 +156,7 @@ class TestRunBlockFiltering:
session=session,
block_id=block_id,
input_data={},
dry_run=False,
)
finally:
_current_permissions.reset(token)
@@ -187,6 +190,7 @@ class TestRunBlockFiltering:
session=session,
block_id=block_id,
input_data={},
dry_run=False,
)
finally:
_current_permissions.reset(token)
@@ -222,6 +226,7 @@ class TestRunBlockFiltering:
session=session,
block_id="standard-id",
input_data={},
dry_run=False,
)
# Should NOT be an ErrorResponse about CoPilot exclusion
@@ -282,6 +287,7 @@ class TestRunBlockInputValidation:
"prompt": "Write a haiku about coding",
"LLM_Model": "claude-opus-4-6",
},
dry_run=False,
)
assert isinstance(response, InputValidationErrorResponse)
@@ -327,6 +333,7 @@ class TestRunBlockInputValidation:
"system_prompt": "Be helpful",
"retries": 5,
},
dry_run=False,
)
assert isinstance(response, InputValidationErrorResponse)
@@ -370,6 +377,7 @@ class TestRunBlockInputValidation:
input_data={
"LLM_Model": "claude-opus-4-6",
},
dry_run=False,
)
assert isinstance(response, InputValidationErrorResponse)
@@ -424,6 +432,7 @@ class TestRunBlockInputValidation:
"prompt": "Write a haiku",
"model": "gpt-4o-mini",
},
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -463,6 +472,7 @@ class TestRunBlockInputValidation:
input_data={
"model": "gpt-4o-mini",
},
dry_run=False,
)
assert isinstance(response, BlockDetailsResponse)
@@ -514,6 +524,7 @@ class TestRunBlockSensitiveAction:
session=session,
block_id="delete-branch-id",
input_data=input_data,
dry_run=False,
)
assert isinstance(response, ReviewRequiredResponse)
@@ -574,6 +585,7 @@ class TestRunBlockSensitiveAction:
session=session,
block_id="delete-branch-id",
input_data=input_data,
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)
@@ -628,6 +640,7 @@ class TestRunBlockSensitiveAction:
session=session,
block_id="http-request-id",
input_data=input_data,
dry_run=False,
)
assert isinstance(response, BlockOutputResponse)

View File

@@ -91,21 +91,40 @@ class RunMCPToolTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
server_url: str = "",
tool_name: str = "",
tool_arguments: dict[str, Any] | None = None,
**kwargs,
) -> ToolResponseBase:
server_url: str = (kwargs.get("server_url") or "").strip()
tool_name: str = (kwargs.get("tool_name") or "").strip()
raw_tool_arguments = kwargs.get("tool_arguments")
tool_arguments: dict[str, Any] = (
raw_tool_arguments if isinstance(raw_tool_arguments, dict) else {}
)
server_url = server_url.strip()
tool_name = tool_name.strip()
session_id = session.session_id
if raw_tool_arguments is not None and not isinstance(raw_tool_arguments, dict):
# Session-level dry_run prevents real MCP tool execution.
# Discovery (no tool_name) is still allowed so the agent can inspect
# available tools, but actual execution is blocked.
if session.dry_run and tool_name:
return MCPToolOutputResponse(
message=(
f"[dry-run] MCP tool '{tool_name}' on "
f"{server_host(server_url)} was not executed "
"because the session is in dry-run mode."
),
server_url=server_url,
tool_name=tool_name,
result=None,
success=True,
session_id=session_id,
)
if tool_arguments is not None and not isinstance(tool_arguments, dict):
return ErrorResponse(
message="tool_arguments must be a JSON object.",
session_id=session_id,
)
resolved_tool_arguments: dict[str, Any] = (
tool_arguments if isinstance(tool_arguments, dict) else {}
)
if not server_url:
return ErrorResponse(
@@ -167,7 +186,7 @@ class RunMCPToolTool(BaseTool):
else:
# Stage 2: Execute the selected tool
return await self._execute_tool(
client, server_url, tool_name, tool_arguments, session_id
client, server_url, tool_name, resolved_tool_arguments, session_id
)
except HTTPClientError as e:

View File

@@ -85,6 +85,7 @@ class SearchDocsTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
query: str = "",
**kwargs,
) -> ToolResponseBase:
"""Search documentation and return relevant sections.
@@ -99,7 +100,7 @@ class SearchDocsTool(BaseTool):
NoResultsResponse: No results found
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
query = query.strip()
session_id = session.session_id if session else None
if not query:

View File

@@ -73,7 +73,10 @@ def make_openai_response(
@pytest.mark.asyncio
async def test_simulate_block_basic():
"""simulate_block returns correct (output_name, output_data) tuples."""
"""simulate_block returns correct (output_name, output_data) tuples.
Empty "error" pins are dropped at source — only non-empty errors are yielded.
"""
mock_block = make_mock_block()
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(
@@ -88,7 +91,8 @@ async def test_simulate_block_basic():
outputs.append((name, data))
assert ("result", "simulated output") in outputs
assert ("error", "") in outputs
# Empty error pin is dropped at the simulator level
assert ("error", "") not in outputs
@pytest.mark.asyncio
@@ -113,6 +117,8 @@ async def test_simulate_block_json_retry():
assert mock_client.chat.completions.create.call_count == 3
assert ("result", "ok") in outputs
# Empty error pin is dropped
assert ("error", "") not in outputs
@pytest.mark.asyncio
@@ -141,7 +147,7 @@ async def test_simulate_block_all_retries_exhausted():
@pytest.mark.asyncio
async def test_simulate_block_missing_output_pins():
"""LLM response missing some output pins; verify they're filled with None."""
"""LLM response missing some output pins; verify non-error pins filled with None."""
mock_block = make_mock_block(
output_props={
"result": {"type": "string"},
@@ -164,7 +170,29 @@ async def test_simulate_block_missing_output_pins():
assert outputs["result"] == "hello"
assert outputs["count"] is None # missing pin filled with None
assert outputs["error"] == "" # "error" pin filled with ""
assert "error" not in outputs # missing error pin is omitted entirely
@pytest.mark.asyncio
async def test_simulate_block_keeps_nonempty_error():
"""simulate_block keeps non-empty error pins (simulated logical errors)."""
mock_block = make_mock_block()
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(
return_value=make_openai_response(
'{"result": "", "error": "API rate limit exceeded"}'
)
)
with patch(
"backend.executor.simulator.get_openai_client", return_value=mock_client
):
outputs = []
async for name, data in simulate_block(mock_block, {"query": "test"}):
outputs.append((name, data))
assert ("result", "") in outputs
assert ("error", "API rate limit exceeded") in outputs
@pytest.mark.asyncio
@@ -200,6 +228,19 @@ async def test_simulate_block_truncates_long_inputs():
assert len(parsed["text"]) < 25000
def test_build_simulation_prompt_excludes_error_from_must_include():
"""The 'MUST include' prompt line should NOT list 'error' — the prompt
already instructs the LLM to OMIT error unless simulating a logical error.
Including it in 'MUST include' would be contradictory."""
block = make_mock_block() # default output_props has "result" and "error"
system_prompt, _ = build_simulation_prompt(block, {"query": "test"})
must_include_line = [
line for line in system_prompt.splitlines() if "MUST include" in line
][0]
assert '"result"' in must_include_line
assert '"error"' not in must_include_line
# ---------------------------------------------------------------------------
# execute_block dry-run tests
# ---------------------------------------------------------------------------
@@ -238,7 +279,7 @@ async def test_execute_block_dry_run_skips_real_execution():
@pytest.mark.asyncio
async def test_execute_block_dry_run_response_format():
"""Dry-run response should contain [DRY RUN] in message and success=True."""
"""Dry-run response should match real execution message format and have success=True."""
mock_block = make_mock_block()
async def fake_simulate(block, input_data):
@@ -259,7 +300,8 @@ async def test_execute_block_dry_run_response_format():
)
assert isinstance(response, BlockOutputResponse)
assert "[DRY RUN]" in response.message
assert "executed successfully" in response.message
assert "[DRY RUN]" not in response.message # must not leak to LLM context
assert response.success is True
assert response.outputs == {"result": ["simulated"]}
@@ -307,23 +349,24 @@ async def test_execute_block_real_execution_unchanged():
def test_run_block_tool_dry_run_param():
"""RunBlockTool parameters should include 'dry_run'."""
"""RunBlockTool parameters should include 'dry_run' as a required field."""
tool = RunBlockTool()
params = tool.parameters
assert "dry_run" in params["properties"]
assert params["properties"]["dry_run"]["type"] == "boolean"
assert "dry_run" in params["required"]
def test_run_block_tool_dry_run_calls_execute():
"""RunBlockTool._execute extracts dry_run from kwargs correctly.
"""RunBlockTool._execute accepts dry_run as a typed parameter.
We verify the extraction logic directly by inspecting the source, then confirm
the kwarg is forwarded in the execute_block call site.
We verify the parameter exists in the signature and is forwarded to
execute_block.
"""
source = inspect.getsource(run_block_module.RunBlockTool._execute)
# Verify dry_run is extracted from kwargs
# Verify dry_run is a typed parameter (not extracted from kwargs)
assert "dry_run" in source
assert 'kwargs.get("dry_run"' in source
assert "dry_run: bool" in source
# Scope to _execute method source only — module-wide search is brittle
# and can match unrelated text/comments.
@@ -332,13 +375,107 @@ def test_run_block_tool_dry_run_calls_execute():
assert "dry_run=dry_run" in source_execute
@pytest.mark.asyncio
async def test_execute_block_dry_run_no_empty_error_from_simulator():
"""The simulator no longer yields empty error pins, so execute_block
simply passes through whatever the simulator produces.
Since the fix is at the simulator level, even if a simulator somehow
yields only non-error outputs, they pass through unchanged.
"""
mock_block = make_mock_block()
async def fake_simulate(block, input_data):
# Simulator now omits empty error pins at source
yield "result", "simulated output"
with patch(
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate
):
response = await execute_block(
block=mock_block,
block_id="test-block-id",
input_data={"query": "hello"},
user_id="user-1",
session_id="session-1",
node_exec_id="node-exec-1",
matched_credentials={},
dry_run=True,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
assert response.is_dry_run is True
assert "error" not in response.outputs
assert response.outputs == {"result": ["simulated output"]}
@pytest.mark.asyncio
async def test_execute_block_dry_run_keeps_nonempty_error_pin():
"""Dry-run should keep the 'error' pin when it contains a real error message."""
mock_block = make_mock_block()
async def fake_simulate(block, input_data):
yield "result", ""
yield "error", "API rate limit exceeded"
with patch(
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate
):
response = await execute_block(
block=mock_block,
block_id="test-block-id",
input_data={"query": "hello"},
user_id="user-1",
session_id="session-1",
node_exec_id="node-exec-1",
matched_credentials={},
dry_run=True,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
# Non-empty error should be preserved
assert "error" in response.outputs
assert response.outputs["error"] == ["API rate limit exceeded"]
@pytest.mark.asyncio
async def test_execute_block_dry_run_message_includes_completed_status():
"""Dry-run message should clearly indicate COMPLETED status."""
mock_block = make_mock_block()
async def fake_simulate(block, input_data):
yield "result", "simulated"
with patch(
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate
):
response = await execute_block(
block=mock_block,
block_id="test-block-id",
input_data={"query": "hello"},
user_id="user-1",
session_id="session-1",
node_exec_id="node-exec-1",
matched_credentials={},
dry_run=True,
)
assert isinstance(response, BlockOutputResponse)
assert "executed successfully" in response.message
@pytest.mark.asyncio
async def test_execute_block_dry_run_simulator_error_returns_error_response():
"""When simulate_block yields a SIMULATOR ERROR tuple, execute_block returns ErrorResponse."""
mock_block = make_mock_block()
async def fake_simulate_error(block, input_data):
yield "error", "[SIMULATOR ERROR — NOT A BLOCK FAILURE] No LLM client available (missing OpenAI/OpenRouter API key)."
yield (
"error",
"[SIMULATOR ERROR — NOT A BLOCK FAILURE] No LLM client available (missing OpenAI/OpenRouter API key).",
)
with patch(
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate_error

View File

@@ -76,6 +76,7 @@ async def test_run_block_returns_details_when_no_input_provided():
session=session,
block_id="http-block-id",
input_data={}, # Empty input data
dry_run=False,
)
# Should return BlockDetailsResponse showing the schema
@@ -143,6 +144,7 @@ async def test_run_block_returns_details_when_only_credentials_provided():
session=session,
block_id="api-block-id",
input_data={"credentials": {"some": "cred"}}, # Only credential
dry_run=False,
)
# Should return details because no non-credential inputs provided

View File

@@ -151,7 +151,7 @@ async def test_non_dict_tool_arguments_returns_error():
session=session,
server_url=_SERVER_URL,
tool_name="fetch",
tool_arguments=["this", "is", "a", "list"], # wrong type
tool_arguments=["this", "is", "a", "list"], # type: ignore[arg-type] # intentionally wrong type to test validation
)
assert isinstance(response, ErrorResponse)

View File

@@ -0,0 +1,499 @@
"""Tests for session-level dry_run flag propagation.
Verifies that when a session has dry_run=True, run_block, run_agent, and
run_mcp_tool calls are forced to use dry-run mode, regardless of what the
individual tool call specifies. The single source of truth is
``session.dry_run``.
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.copilot.model import ChatSession
from backend.copilot.tools.models import ErrorResponse, MCPToolOutputResponse
from backend.copilot.tools.run_agent import RunAgentInput, RunAgentTool
from backend.copilot.tools.run_block import RunBlockTool
from backend.copilot.tools.run_mcp_tool import RunMCPToolTool
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_session(dry_run: bool = False) -> ChatSession:
"""Create a minimal ChatSession for testing."""
session = ChatSession.new("test-user", dry_run=dry_run)
return session
def _make_mock_block(name: str = "TestBlock"):
"""Create a minimal mock block with jsonschema() methods."""
block = MagicMock()
block.name = name
block.description = "A test block"
block.disabled = False
block.block_type = "STANDARD"
block.id = "test-block-id"
block.input_schema = MagicMock()
block.input_schema.jsonschema.return_value = {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
}
block.input_schema.get_credentials_fields.return_value = {}
block.input_schema.get_credentials_fields_info.return_value = {}
block.output_schema = MagicMock()
block.output_schema.jsonschema.return_value = {
"type": "object",
"properties": {"result": {"type": "string"}},
"required": ["result"],
}
return block
# ---------------------------------------------------------------------------
# RunBlockTool tests
# ---------------------------------------------------------------------------
class TestRunBlockToolSessionDryRun:
"""Test that RunBlockTool respects session-level dry_run."""
@pytest.mark.asyncio
async def test_session_dry_run_forces_block_dry_run(self):
"""When session dry_run is True, run_block should force dry_run=True."""
tool = RunBlockTool()
session = _make_session(dry_run=True)
mock_block = _make_mock_block()
with (
patch(
"backend.copilot.tools.run_block.prepare_block_for_execution"
) as mock_prep,
patch("backend.copilot.tools.run_block.execute_block") as mock_exec,
patch(
"backend.copilot.tools.run_block.get_current_permissions",
return_value=None,
),
):
# Set up prepare_block_for_execution to return a mock prep
mock_prep_result = MagicMock()
mock_prep_result.block = mock_block
mock_prep_result.input_data = {"query": "test"}
mock_prep_result.matched_credentials = {}
mock_prep_result.synthetic_node_id = "node-1"
mock_prep.return_value = mock_prep_result
# Set up execute_block to return a success
mock_exec.return_value = MagicMock(
message="Block 'TestBlock' executed successfully",
success=True,
)
await tool._execute(
user_id="test-user",
session=session,
block_id="test-block-id",
input_data={"query": "test"},
dry_run=False, # User passed False, but session overrides
)
# Verify execute_block was called with dry_run=True
mock_exec.assert_called_once()
call_kwargs = mock_exec.call_args
assert call_kwargs.kwargs.get("dry_run") is True
@pytest.mark.asyncio
async def test_no_session_dry_run_respects_tool_param(self):
"""When session dry_run is False, tool-level dry_run should be respected."""
tool = RunBlockTool()
session = _make_session(dry_run=False)
mock_block = _make_mock_block()
with (
patch(
"backend.copilot.tools.run_block.prepare_block_for_execution"
) as mock_prep,
patch("backend.copilot.tools.run_block.execute_block") as mock_exec,
patch(
"backend.copilot.tools.run_block.get_current_permissions",
return_value=None,
),
patch("backend.copilot.tools.run_block.check_hitl_review") as mock_hitl,
):
mock_prep_result = MagicMock()
mock_prep_result.block = mock_block
mock_prep_result.input_data = {"query": "test"}
mock_prep_result.matched_credentials = {}
mock_prep_result.synthetic_node_id = "node-1"
mock_prep_result.required_non_credential_keys = {"query"}
mock_prep_result.provided_input_keys = {"query"}
mock_prep.return_value = mock_prep_result
mock_hitl.return_value = ("node-exec-1", {"query": "test"})
mock_exec.return_value = MagicMock(
message="Block executed",
success=True,
)
await tool._execute(
user_id="test-user",
session=session,
block_id="test-block-id",
input_data={"query": "test"},
dry_run=False,
)
# Verify execute_block was called with dry_run=False
mock_exec.assert_called_once()
call_kwargs = mock_exec.call_args
assert call_kwargs.kwargs.get("dry_run") is False
# ---------------------------------------------------------------------------
# RunAgentTool tests
# ---------------------------------------------------------------------------
class TestRunAgentToolSessionDryRun:
"""Test that RunAgentTool respects session-level dry_run."""
@pytest.mark.asyncio
async def test_session_dry_run_forces_agent_dry_run(self):
"""When session dry_run is True, run_agent params.dry_run should be forced True."""
tool = RunAgentTool()
session = _make_session(dry_run=True)
# Mock the graph and dependencies
mock_graph = MagicMock()
mock_graph.id = "graph-1"
mock_graph.name = "Test Agent"
mock_graph.description = "A test agent"
mock_graph.input_schema = {"properties": {}, "required": []}
mock_graph.trigger_setup_info = None
mock_library_agent = MagicMock()
mock_library_agent.id = "lib-1"
mock_library_agent.graph_id = "graph-1"
mock_library_agent.graph_version = 1
mock_library_agent.name = "Test Agent"
mock_execution = MagicMock()
mock_execution.id = "exec-1"
with (
patch("backend.copilot.tools.run_agent.graph_db"),
patch("backend.copilot.tools.run_agent.library_db"),
patch(
"backend.copilot.tools.run_agent.fetch_graph_from_store_slug",
return_value=(mock_graph, None),
),
patch(
"backend.copilot.tools.run_agent.match_user_credentials_to_graph",
return_value=({}, []),
),
patch(
"backend.copilot.tools.run_agent.get_or_create_library_agent",
return_value=mock_library_agent,
),
patch("backend.copilot.tools.run_agent.execution_utils") as mock_exec_utils,
patch("backend.copilot.tools.run_agent.track_agent_run_success"),
):
mock_exec_utils.add_graph_execution = AsyncMock(return_value=mock_execution)
await tool._execute(
user_id="test-user",
session=session,
username_agent_slug="user/test-agent",
dry_run=False, # User passed False, but session overrides
use_defaults=True,
)
# Verify add_graph_execution was called with dry_run=True
mock_exec_utils.add_graph_execution.assert_called_once()
call_kwargs = mock_exec_utils.add_graph_execution.call_args
assert call_kwargs.kwargs.get("dry_run") is True
@pytest.mark.asyncio
async def test_session_dry_run_blocks_scheduling(self):
"""When session dry_run is True, scheduling requests should be rejected."""
tool = RunAgentTool()
session = _make_session(dry_run=True)
result = await tool._execute(
user_id="test-user",
session=session,
username_agent_slug="user/test-agent",
schedule_name="daily-run",
cron="0 9 * * *",
dry_run=False, # Session overrides to True
)
assert isinstance(result, ErrorResponse)
assert "dry-run" in result.message.lower()
assert (
"scheduling" in result.message.lower()
or "schedule" in result.message.lower()
)
# ---------------------------------------------------------------------------
# ChatSession model tests
# ---------------------------------------------------------------------------
class TestChatSessionDryRun:
"""Test the dry_run field on ChatSession model."""
def test_new_session_default_dry_run_false(self):
session = ChatSession.new("test-user", dry_run=False)
assert session.dry_run is False
def test_new_session_dry_run_true(self):
session = ChatSession.new("test-user", dry_run=True)
assert session.dry_run is True
def test_new_session_dry_run_false_explicit(self):
session = ChatSession.new("test-user", dry_run=False)
assert session.dry_run is False
# ---------------------------------------------------------------------------
# RunAgentInput tests
# ---------------------------------------------------------------------------
class TestRunAgentInputDryRunOverride:
"""Test that RunAgentInput.dry_run can be mutated by session-level override."""
def test_explicit_dry_run_false(self):
params = RunAgentInput(username_agent_slug="user/agent", dry_run=False)
assert params.dry_run is False
def test_session_override(self):
params = RunAgentInput(username_agent_slug="user/agent", dry_run=False)
# Simulate session-level override
params.dry_run = True
assert params.dry_run is True
# ---------------------------------------------------------------------------
# RunMCPToolTool tests
# ---------------------------------------------------------------------------
class TestRunMCPToolToolSessionDryRun:
"""Test that RunMCPToolTool respects session-level dry_run."""
@pytest.mark.asyncio
async def test_session_dry_run_blocks_mcp_execution(self):
"""When session dry_run is True, MCP tool execution should be skipped."""
tool = RunMCPToolTool()
session = _make_session(dry_run=True)
result = await tool._execute(
user_id="test-user",
session=session,
server_url="https://mcp.example.com/sse",
tool_name="some_tool",
tool_arguments={"key": "value"},
)
assert isinstance(result, MCPToolOutputResponse)
assert result.success is True
assert "dry-run" in result.message
assert result.tool_name == "some_tool"
assert result.result is None
@pytest.mark.asyncio
async def test_session_dry_run_allows_discovery(self):
"""When session dry_run is True, tool discovery (no tool_name) should still work."""
tool = RunMCPToolTool()
session = _make_session(dry_run=True)
# Discovery requires a network call, so we mock the client
with (
patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
return_value=None,
),
patch(
"backend.copilot.tools.run_mcp_tool.validate_url_host",
return_value=None,
),
patch("backend.copilot.tools.run_mcp_tool.MCPClient") as mock_client_cls,
):
mock_client = AsyncMock()
mock_client_cls.return_value = mock_client
mock_tool = MagicMock()
mock_tool.name = "test_tool"
mock_tool.description = "A test tool"
mock_tool.input_schema = {"type": "object", "properties": {}}
mock_client.list_tools.return_value = [mock_tool]
result = await tool._execute(
user_id="test-user",
session=session,
server_url="https://mcp.example.com/sse",
tool_name="", # Discovery mode
)
# Discovery should proceed normally
mock_client.initialize.assert_called_once()
mock_client.list_tools.assert_called_once()
assert "Discovered" in result.message
@pytest.mark.asyncio
async def test_no_session_dry_run_allows_execution(self):
"""When session dry_run is False, MCP tool execution should proceed."""
tool = RunMCPToolTool()
session = _make_session(dry_run=False)
with (
patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
return_value=None,
),
patch(
"backend.copilot.tools.run_mcp_tool.validate_url_host",
return_value=None,
),
patch("backend.copilot.tools.run_mcp_tool.MCPClient") as mock_client_cls,
):
mock_client = AsyncMock()
mock_client_cls.return_value = mock_client
mock_result = MagicMock()
mock_result.is_error = False
mock_result.content = [{"type": "text", "text": "hello"}]
mock_client.call_tool.return_value = mock_result
result = await tool._execute(
user_id="test-user",
session=session,
server_url="https://mcp.example.com/sse",
tool_name="some_tool",
tool_arguments={"key": "value"},
)
# Execution should proceed
mock_client.initialize.assert_called_once()
mock_client.call_tool.assert_called_once_with("some_tool", {"key": "value"})
assert isinstance(result, MCPToolOutputResponse)
assert result.success is True
# ---------------------------------------------------------------------------
# Backward-compatibility tests for ChatSessionMetadata deserialization
# ---------------------------------------------------------------------------
class TestChatSessionMetadataBackwardCompat:
"""Verify that sessions created before the dry_run field existed still load.
The ``metadata`` JSON column in the DB may contain ``{}``, ``null``, or a
dict without the ``dry_run`` key for sessions created before the flag was
introduced. These must deserialize without errors and default to
``dry_run=False``.
"""
def test_metadata_default_construction(self):
"""ChatSessionMetadata() with no args should default dry_run=False."""
from backend.copilot.model import ChatSessionMetadata
meta = ChatSessionMetadata()
assert meta.dry_run is False
def test_metadata_from_empty_dict(self):
"""Deserializing an empty dict (old-format metadata) should succeed."""
from backend.copilot.model import ChatSessionMetadata
meta = ChatSessionMetadata.model_validate({})
assert meta.dry_run is False
def test_metadata_from_dict_without_dry_run_key(self):
"""A metadata dict with other keys but no dry_run should still work."""
from backend.copilot.model import ChatSessionMetadata
meta = ChatSessionMetadata.model_validate({"some_future_field": 42})
# dry_run should fall back to default
assert meta.dry_run is False
def test_metadata_round_trip_with_dry_run_false(self):
"""Serialize then deserialize with dry_run=False."""
from backend.copilot.model import ChatSessionMetadata
original = ChatSessionMetadata(dry_run=False)
raw = original.model_dump()
restored = ChatSessionMetadata.model_validate(raw)
assert restored.dry_run is False
def test_metadata_round_trip_with_dry_run_true(self):
"""Serialize then deserialize with dry_run=True."""
from backend.copilot.model import ChatSessionMetadata
original = ChatSessionMetadata(dry_run=True)
raw = original.model_dump()
restored = ChatSessionMetadata.model_validate(raw)
assert restored.dry_run is True
def test_metadata_json_round_trip(self):
"""Serialize to JSON string and back, simulating Redis cache flow."""
from backend.copilot.model import ChatSessionMetadata
original = ChatSessionMetadata(dry_run=True)
json_str = original.model_dump_json()
restored = ChatSessionMetadata.model_validate_json(json_str)
assert restored.dry_run is True
def test_session_dry_run_property_with_default_metadata(self):
"""ChatSession.dry_run returns False when metadata has no dry_run."""
from backend.copilot.model import ChatSessionMetadata
# Simulate building a session with metadata deserialized from an old row
meta = ChatSessionMetadata.model_validate({})
session = _make_session(dry_run=False)
session.metadata = meta
assert session.dry_run is False
def test_session_info_dry_run_property_with_default_metadata(self):
"""ChatSessionInfo.dry_run returns False when metadata is default."""
from datetime import UTC, datetime
from backend.copilot.model import ChatSessionInfo, ChatSessionMetadata
info = ChatSessionInfo(
session_id="old-session-id",
user_id="test-user",
usage=[],
started_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
metadata=ChatSessionMetadata.model_validate({}),
)
assert info.dry_run is False
def test_session_full_json_round_trip_without_dry_run(self):
"""A full ChatSession JSON round-trip preserves dry_run default."""
session = _make_session(dry_run=False)
json_bytes = session.model_dump_json()
restored = ChatSession.model_validate_json(json_bytes)
assert restored.dry_run is False
assert restored.metadata.dry_run is False
def test_session_full_json_round_trip_with_dry_run(self):
"""A full ChatSession JSON round-trip preserves dry_run=True."""
session = _make_session(dry_run=True)
json_bytes = session.model_dump_json()
restored = ChatSession.model_validate_json(json_bytes)
assert restored.dry_run is True
assert restored.metadata.dry_run is True

View File

@@ -48,9 +48,9 @@ class ValidateAgentGraphTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
agent_json: dict | None = None,
**kwargs,
) -> ToolResponseBase:
agent_json = kwargs.get("agent_json")
session_id = session.session_id if session else None
if not agent_json or not isinstance(agent_json, dict):

View File

@@ -87,10 +87,11 @@ class WebFetchTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
url: str = "",
extract_text: bool = True,
**kwargs: Any,
) -> ToolResponseBase:
url: str = (kwargs.get("url") or "").strip()
extract_text: bool = kwargs.get("extract_text", True)
url = url.strip()
session_id = session.session_id if session else None
if not url:

View File

@@ -450,6 +450,9 @@ class ListWorkspaceFilesTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
path_prefix: Optional[str] = None,
limit: int = 50,
include_all_sessions: bool = False,
**kwargs,
) -> ToolResponseBase:
session_id = session.session_id
@@ -458,9 +461,7 @@ class ListWorkspaceFilesTool(BaseTool):
message="Authentication required", session_id=session_id
)
path_prefix: Optional[str] = kwargs.get("path_prefix")
limit = min(kwargs.get("limit", 50), 100)
include_all_sessions: bool = kwargs.get("include_all_sessions", False)
limit = min(limit, 100)
try:
manager = await get_workspace_manager(user_id, session_id)
@@ -567,6 +568,12 @@ class ReadWorkspaceFileTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
file_id: Optional[str] = None,
path: Optional[str] = None,
save_to_path: Optional[str] = None,
force_download_url: bool = False,
offset: int = 0,
length: Optional[int] = None,
**kwargs,
) -> ToolResponseBase:
session_id = session.session_id
@@ -575,12 +582,8 @@ class ReadWorkspaceFileTool(BaseTool):
message="Authentication required", session_id=session_id
)
file_id: Optional[str] = kwargs.get("file_id")
path: Optional[str] = kwargs.get("path")
save_to_path: Optional[str] = kwargs.get("save_to_path")
force_download_url: bool = kwargs.get("force_download_url", False)
char_offset: int = max(0, kwargs.get("offset", 0))
char_length: Optional[int] = kwargs.get("length")
char_offset: int = max(0, offset)
char_length: Optional[int] = length
if not file_id and not path:
return ErrorResponse(
@@ -770,6 +773,13 @@ class WriteWorkspaceFileTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
filename: str = "",
source_path: str | None = None,
content: str | None = None,
content_base64: str | None = None,
path: str | None = None,
mime_type: str | None = None,
overwrite: bool = False,
**kwargs,
) -> ToolResponseBase:
session_id = session.session_id
@@ -778,15 +788,36 @@ class WriteWorkspaceFileTool(BaseTool):
message="Authentication required", session_id=session_id
)
filename: str = kwargs.get("filename", "")
if not filename:
# When ALL parameters are missing, the most likely cause is
# output token truncation: the LLM tried to inline a very large
# file as `content`, the SDK silently truncated the tool call
# arguments to `{}`, and we receive nothing. Return an
# actionable error instead of a generic "filename required".
has_any_content = any(
kwargs.get(k) for k in ("content", "content_base64", "source_path")
)
if not has_any_content:
return ErrorResponse(
message=(
"Tool call appears truncated (no arguments received). "
"This happens when the content is too large for a "
"single tool call. Instead of passing content inline, "
"first write the file to the working directory using "
"bash_exec (e.g. cat > /home/user/file.md << 'EOF'... "
"EOF), then use source_path to copy it to workspace: "
"write_workspace_file(filename='file.md', "
"source_path='/home/user/file.md')"
),
session_id=session_id,
)
return ErrorResponse(
message="Please provide a filename", session_id=session_id
)
source_path_arg: str | None = kwargs.get("source_path")
content_text: str | None = kwargs.get("content")
content_b64: str | None = kwargs.get("content_base64")
source_path_arg: str | None = source_path
content_text: str | None = content
content_b64: str | None = content_base64
resolved = await _resolve_write_content(
content_text,
@@ -796,24 +827,24 @@ class WriteWorkspaceFileTool(BaseTool):
)
if isinstance(resolved, ErrorResponse):
return resolved
content: bytes = resolved
content_bytes: bytes = resolved
max_size = _MAX_FILE_SIZE_MB * 1024 * 1024
if len(content) > max_size:
if len(content_bytes) > max_size:
return ErrorResponse(
message=f"File too large. Maximum size is {_MAX_FILE_SIZE_MB}MB",
session_id=session_id,
)
try:
await scan_content_safe(content, filename=filename)
await scan_content_safe(content_bytes, filename=filename)
manager = await get_workspace_manager(user_id, session_id)
rec = await manager.write_file(
content=content,
content=content_bytes,
filename=filename,
path=kwargs.get("path"),
mime_type=kwargs.get("mime_type"),
overwrite=kwargs.get("overwrite", False),
path=path,
mime_type=mime_type,
overwrite=overwrite,
)
# Build informative source label and message.
@@ -837,8 +868,8 @@ class WriteWorkspaceFileTool(BaseTool):
preview: str | None = None
if _is_text_mime(rec.mime_type):
try:
preview = content[:200].decode("utf-8", errors="replace")
if len(content) > 200:
preview = content_bytes[:200].decode("utf-8", errors="replace")
if len(content_bytes) > 200:
preview += "..."
except Exception:
pass
@@ -910,6 +941,8 @@ class DeleteWorkspaceFileTool(BaseTool):
self,
user_id: str | None,
session: ChatSession,
file_id: Optional[str] = None,
path: Optional[str] = None,
**kwargs,
) -> ToolResponseBase:
session_id = session.session_id
@@ -917,9 +950,6 @@ class DeleteWorkspaceFileTool(BaseTool):
return ErrorResponse(
message="Authentication required", session_id=session_id
)
file_id: Optional[str] = kwargs.get("file_id")
path: Optional[str] = kwargs.get("path")
if not file_id and not path:
return ErrorResponse(
message="Please provide either file_id or path", session_id=session_id

View File

@@ -13,7 +13,7 @@ Inspired by https://github.com/Significant-Gravitas/agent-simulator
import json
import logging
from collections.abc import AsyncIterator
from collections.abc import AsyncGenerator
from typing import Any
from backend.util.clients import get_openai_client
@@ -96,6 +96,10 @@ def build_simulation_prompt(block: Any, input_data: dict[str, Any]) -> tuple[str
input_pins = _describe_schema_pins(input_schema)
output_pins = _describe_schema_pins(output_schema)
output_properties = list(output_schema.get("properties", {}).keys())
# Build a separate list for the "MUST include" instruction that excludes
# "error" — the prompt already tells the LLM to OMIT the error pin unless
# simulating a logical error. Including it in "MUST include" is contradictory.
required_output_properties = [k for k in output_properties if k != "error"]
block_name = getattr(block, "name", type(block).__name__)
block_description = getattr(block, "description", "No description available.")
@@ -117,10 +121,10 @@ Rules:
- Respond with a single JSON object whose keys are EXACTLY the output pin names listed above.
- Assume all credentials and authentication are present and valid. Never simulate authentication failures.
- Make the simulated outputs realistic and consistent with the inputs.
- If there is an "error" pin, set it to "" (empty string) unless you are simulating a logical error.
- If there is an "error" pin, OMIT it entirely unless you are simulating a logical error. Only include the "error" pin when there is a genuine error message to report.
- Do not include any extra keys beyond the output pins.
Output pin names you MUST include: {json.dumps(output_properties)}
Output pin names you MUST include: {json.dumps(required_output_properties)}
"""
safe_inputs = _truncate_input_values(input_data)
@@ -132,7 +136,7 @@ Output pin names you MUST include: {json.dumps(output_properties)}
async def simulate_block(
block: Any,
input_data: dict[str, Any],
) -> AsyncIterator[tuple[str, Any]]:
) -> AsyncGenerator[tuple[str, Any], None]:
"""Simulate block execution using an LLM.
Yields (output_name, output_data) tuples matching the Block.execute() interface.
@@ -172,13 +176,26 @@ async def simulate_block(
if not isinstance(parsed, dict):
raise ValueError(f"LLM returned non-object JSON: {raw[:200]}")
# Fill missing output pins with defaults
# Fill missing output pins with defaults.
# Skip empty "error" pins — an empty string means "no error" and
# would only confuse downstream consumers (LLM, frontend).
result: dict[str, Any] = {}
for pin_name in output_properties:
if pin_name in parsed:
result[pin_name] = parsed[pin_name]
else:
result[pin_name] = "" if pin_name == "error" else None
value = parsed[pin_name]
# Drop empty/blank error pins: they carry no information.
# Uses strip() intentionally so whitespace-only strings
# (e.g. " ", "\n") are also treated as empty.
if (
pin_name == "error"
and isinstance(value, str)
and not value.strip()
):
continue
result[pin_name] = value
elif pin_name != "error":
# Only fill non-error missing pins with None
result[pin_name] = None
logger.debug(
"simulate_block: block=%s attempt=%d tokens=%s/%s",

View File

@@ -251,6 +251,50 @@ def estimate_token_count_str(
DEFAULT_TOKEN_THRESHOLD = 120_000
DEFAULT_KEEP_RECENT = 15
# Reserve tokens for system prompt, tool definitions, and per-turn overhead.
# The actual model context limit minus this reserve = compression target.
_CONTEXT_OVERHEAD_RESERVE = 60_000
def get_context_window(model: str) -> int | None:
"""Return the context window size for a model, or None if unknown.
Looks up the model in the :class:`LlmModel` enum (which already
carries ``context_window`` via ``MODEL_METADATA``). Handles
provider-prefixed names (``anthropic/claude-opus-4-6``) and
case-insensitive input automatically.
"""
from backend.blocks.llm import LlmModel # lazy to avoid circular import
try:
llm_model = LlmModel(model)
return llm_model.context_window
except (ValueError, KeyError):
pass
# Retry with lowercase for case-insensitive lookup
try:
llm_model = LlmModel(model.lower())
return llm_model.context_window
except (ValueError, KeyError):
return None
def get_compression_target(model: str) -> int:
"""Compute a model-aware compression target for conversation history.
Returns ``context_window - overhead_reserve``, floored at 10K.
Falls back to ``DEFAULT_TOKEN_THRESHOLD`` for unknown models or
models whose context window is too small for the overhead reserve.
"""
window = get_context_window(model)
if window is None:
return DEFAULT_TOKEN_THRESHOLD
target = window - _CONTEXT_OVERHEAD_RESERVE
if target < 10_000:
return DEFAULT_TOKEN_THRESHOLD
return target
@dataclass
class CompressResult:
@@ -660,7 +704,7 @@ async def _summarize_messages_llm(
async def compress_context(
messages: list[dict],
target_tokens: int = DEFAULT_TOKEN_THRESHOLD,
target_tokens: int | None = None,
*,
model: str = "gpt-4o",
client: AsyncOpenAI | None = None,
@@ -672,6 +716,11 @@ async def compress_context(
"""
Unified context compression that combines summarization and truncation strategies.
When ``target_tokens`` is None (the default), it is computed from the
model's context window via ``get_compression_target(model)``. This
ensures large-context models (e.g. Opus 200K) retain more history
while smaller models compress more aggressively.
Strategy (in order):
1. **LLM summarization** If client provided, summarize old messages into a
single context message while keeping recent messages intact. This is the
@@ -699,6 +748,10 @@ async def compress_context(
-------
CompressResult with compressed messages and metadata.
"""
# Resolve model-aware target when caller doesn't specify an explicit limit.
if target_tokens is None:
target_tokens = get_compression_target(model)
# Guard clause for empty messages
if not messages:
return CompressResult(

View File

@@ -7,6 +7,7 @@ from tiktoken import encoding_for_model
from backend.util import json
from backend.util.prompt import (
DEFAULT_TOKEN_THRESHOLD,
CompressResult,
_ensure_tool_pairs_intact,
_msg_tokens,
@@ -15,6 +16,8 @@ from backend.util.prompt import (
_truncate_tool_message_content,
compress_context,
estimate_token_count,
get_compression_target,
get_context_window,
)
@@ -974,3 +977,43 @@ class TestCompressResultDataclass:
assert result.original_token_count == 500
assert result.messages_summarized == 10
assert result.messages_dropped == 5
class TestGetContextWindow:
def test_claude_opus(self) -> None:
assert get_context_window("claude-opus-4-20250514") == 200_000
def test_claude_sonnet(self) -> None:
assert get_context_window("claude-sonnet-4-20250514") == 200_000
def test_openrouter_prefix(self) -> None:
assert get_context_window("anthropic/claude-opus-4-6") == 200_000
def test_version_suffix(self) -> None:
assert get_context_window("claude-opus-4-6") == 200_000
def test_gpt4o(self) -> None:
assert get_context_window("gpt-4o") == 128_000
def test_unknown_model(self) -> None:
assert get_context_window("some-unknown-model") is None
def test_case_insensitive(self) -> None:
assert get_context_window("GPT-4o") == 128_000
class TestGetCompressionTarget:
def test_claude_opus_200k(self) -> None:
target = get_compression_target("anthropic/claude-opus-4-6")
assert target == 140_000 # 200K - 60K overhead
def test_gpt4o_128k(self) -> None:
target = get_compression_target("gpt-4o")
assert target == 68_000 # 128K - 60K overhead
def test_unknown_model_returns_default(self) -> None:
assert get_compression_target("unknown-model") == DEFAULT_TOKEN_THRESHOLD
def test_small_model_returns_default(self) -> None:
# Unknown models fall back to DEFAULT_TOKEN_THRESHOLD
assert get_compression_target("some-tiny-model") == DEFAULT_TOKEN_THRESHOLD

View File

@@ -0,0 +1,4 @@
-- Add extensible metadata JSONB column to ChatSession.
-- New session-level flags (e.g. dry_run) live inside this JSON
-- so future additions need no extra migrations.
ALTER TABLE "ChatSession" ADD COLUMN "metadata" JSONB NOT NULL DEFAULT '{}';

View File

@@ -220,6 +220,10 @@ model ChatSession {
successfulAgentRuns Json @default("{}") // Map of graph_id -> count
successfulAgentSchedules Json @default("{}") // Map of graph_id -> count
// Extensible session metadata (typed via ChatSessionMetadata in Python).
// Avoids DB migrations for each new flag (e.g. dry_run, future fields).
metadata Json @default("{}")
// Usage tracking
totalPromptTokens Int @default(0)
totalCompletionTokens Int @default(0)

View File

@@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""Download CoPilot transcripts from prod GCS and load into local dev environment.
Usage:
# Step 1: Download from prod GCS (needs MEDIA_GCS_BUCKET_NAME + gcloud auth)
MEDIA_GCS_BUCKET_NAME=<prod-bucket> USER_ID=<user-uuid> \
poetry run python scripts/download_transcripts.py download <session_id> ...
# Step 2: Load downloaded transcripts into local storage + DB
poetry run python scripts/download_transcripts.py load <session_id> ...
# Or do both in one step (if you have GCS access):
MEDIA_GCS_BUCKET_NAME=<prod-bucket> USER_ID=<user-uuid> \
poetry run python scripts/download_transcripts.py both <session_id> ...
The "download" step saves transcripts to transcripts/<session_id>.jsonl.
The "load" step reads those files and:
1. Creates a ChatSession in local DB (or reuses existing)
2. Populates messages from the transcript
3. Stores transcript in local workspace storage
4. Creates metadata so --resume works on the next turn
After "load", you can send a message to the session via the CoPilot UI
and it will use --resume with the loaded transcript.
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
_SAFE_RE = re.compile(r"[^0-9a-fA-F-]")
TRANSCRIPTS_DIR = os.path.join(os.path.dirname(__file__), "..", "transcripts")
def _sanitize(raw: str) -> str:
cleaned = _SAFE_RE.sub("", raw or "")[:36]
if not cleaned:
raise ValueError(f"Invalid ID: {raw!r}")
return cleaned
def _transcript_path(session_id: str) -> str:
return os.path.join(TRANSCRIPTS_DIR, f"{_sanitize(session_id)}.jsonl")
def _meta_path(session_id: str) -> str:
return os.path.join(TRANSCRIPTS_DIR, f"{_sanitize(session_id)}.meta.json")
# ── Download from GCS ─────────────────────────────────────────────────────
async def cmd_download(session_ids: list[str]) -> None:
"""Download transcripts from prod GCS to transcripts/ directory."""
from backend.copilot.sdk.transcript import download_transcript
user_id = os.environ.get("USER_ID", "")
if not user_id:
print("ERROR: Set USER_ID env var to the session owner's user ID.")
print(" You can find it in Sentry breadcrumbs or the DB.")
sys.exit(1)
bucket = os.environ.get("MEDIA_GCS_BUCKET_NAME", "")
if not bucket:
print("ERROR: Set MEDIA_GCS_BUCKET_NAME to the prod GCS bucket.")
sys.exit(1)
os.makedirs(TRANSCRIPTS_DIR, exist_ok=True)
print(f"Downloading from GCS bucket: {bucket}")
print(f"User ID: {user_id}\n")
for sid in session_ids:
print(f"[{sid[:12]}] Downloading...")
try:
dl = await download_transcript(user_id, sid)
except Exception as e:
print(f"[{sid[:12]}] Failed: {e}")
continue
if not dl or not dl.content:
print(f"[{sid[:12]}] Not found in GCS")
continue
out = _transcript_path(sid)
with open(out, "w") as f:
f.write(dl.content)
lines = len(dl.content.strip().split("\n"))
meta = {
"session_id": sid,
"user_id": user_id,
"message_count": dl.message_count,
"uploaded_at": dl.uploaded_at,
"transcript_bytes": len(dl.content),
"transcript_lines": lines,
}
with open(_meta_path(sid), "w") as f:
json.dump(meta, f, indent=2)
print(
f"[{sid[:12]}] Saved: {lines} entries, "
f"{len(dl.content)} bytes, msg_count={dl.message_count}"
)
print("\nDone. Run 'load' command to import into local dev environment.")
# ── Load into local dev ───────────────────────────────────────────────────
def _parse_messages_from_transcript(content: str) -> list[dict]:
"""Extract user/assistant messages from JSONL transcript for DB seeding."""
messages: list[dict] = []
for line in content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(entry, dict):
continue
msg = entry.get("message", {})
role = msg.get("role")
if role not in ("user", "assistant"):
continue
content_blocks = msg.get("content", "")
if isinstance(content_blocks, list):
# Flatten content blocks to text
text_parts = []
for block in content_blocks:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif isinstance(block, str):
text_parts.append(block)
text = "\n".join(text_parts)
elif isinstance(content_blocks, str):
text = content_blocks
else:
text = ""
if text:
messages.append({"role": role, "content": text})
return messages
async def cmd_load(session_ids: list[str]) -> None:
"""Load downloaded transcripts into local workspace storage + DB."""
from backend.copilot.sdk.transcript import upload_transcript
# Use the user_id from meta file or env var
default_user_id = os.environ.get("USER_ID", "")
for sid in session_ids:
transcript_file = _transcript_path(sid)
meta_file = _meta_path(sid)
if not os.path.exists(transcript_file):
print(f"[{sid[:12]}] No transcript file at {transcript_file}")
print(" Run 'download' first, or place the file manually.")
continue
with open(transcript_file) as f:
content = f.read()
# Load meta if available
user_id = default_user_id
msg_count = 0
if os.path.exists(meta_file):
with open(meta_file) as f:
meta = json.load(f)
user_id = meta.get("user_id", user_id)
msg_count = meta.get("message_count", 0)
if not user_id:
print(f"[{sid[:12]}] No user_id — set USER_ID env var or download first")
continue
lines = len(content.strip().split("\n"))
print(f"[{sid[:12]}] Loading transcript: {lines} entries, {len(content)} bytes")
# Parse messages from transcript for DB
messages = _parse_messages_from_transcript(content)
if not msg_count:
msg_count = len(messages)
print(f"[{sid[:12]}] Parsed {len(messages)} messages for DB")
# Create chat session in DB
try:
from backend.copilot.db import create_chat_session, get_chat_session
existing = await get_chat_session(sid)
if existing:
print(f"[{sid[:12]}] Session already exists in DB, skipping creation")
else:
await create_chat_session(sid, user_id)
print(f"[{sid[:12]}] Created ChatSession in DB")
except Exception as e:
print(f"[{sid[:12]}] DB session creation failed: {e}")
print(" You may need to create it manually or run with DB access.")
# Add messages to DB
if messages:
try:
from backend.copilot.db import add_chat_messages_batch
msg_dicts = [
{"role": m["role"], "content": m["content"]} for m in messages
]
await add_chat_messages_batch(sid, msg_dicts, start_sequence=0)
print(f"[{sid[:12]}] Added {len(messages)} messages to DB")
except Exception as e:
print(f"[{sid[:12]}] Message insertion failed: {e}")
print(" (Session may already have messages)")
# Store transcript in local workspace storage
try:
await upload_transcript(
user_id=user_id,
session_id=sid,
content=content,
message_count=msg_count,
)
print(f"[{sid[:12]}] Stored transcript in local workspace storage")
except Exception as e:
print(f"[{sid[:12]}] Transcript storage failed: {e}")
# Also store directly to filesystem as fallback
try:
from backend.util.settings import Settings
settings = Settings()
storage_dir = settings.config.workspace_storage_dir or os.path.join(
os.path.expanduser("~"), ".autogpt", "workspaces"
)
ts_dir = os.path.join(storage_dir, "chat-transcripts", _sanitize(user_id))
os.makedirs(ts_dir, exist_ok=True)
ts_path = os.path.join(ts_dir, f"{_sanitize(sid)}.jsonl")
with open(ts_path, "w") as f:
f.write(content)
meta_storage = {
"message_count": msg_count,
"uploaded_at": time.time(),
}
meta_storage_path = os.path.join(ts_dir, f"{_sanitize(sid)}.meta.json")
with open(meta_storage_path, "w") as f:
json.dump(meta_storage, f)
print(f"[{sid[:12]}] Also wrote to: {ts_path}")
except Exception as e:
print(f"[{sid[:12]}] Direct file write failed: {e}")
print(f"[{sid[:12]}] Ready — send a message to this session to test")
print()
print("Done. Start the backend and send a message to the session(s).")
print("The CoPilot will use --resume with the loaded transcript.")
# ── Main ──────────────────────────────────────────────────────────────────
async def main() -> None:
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
command = sys.argv[1]
session_ids = sys.argv[2:]
if command == "download":
await cmd_download(session_ids)
elif command == "load":
await cmd_load(session_ids)
elif command == "both":
await cmd_download(session_ids)
print("\n" + "=" * 60 + "\n")
await cmd_load(session_ids)
else:
print(f"Unknown command: {command}")
print("Usage: download | load | both")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,93 @@
# Frontend
This file provides guidance to coding agents when working with the frontend.
## Essential Commands
```bash
# Install dependencies
pnpm i
# Generate API client from OpenAPI spec
pnpm generate:api
# Start development server
pnpm dev
# Run E2E tests
pnpm test
# Run Storybook for component development
pnpm storybook
# Build production
pnpm build
# Format and lint
pnpm format
# Type checking
pnpm types
```
### Pre-completion Checks (MANDATORY)
After making **any** code changes in the frontend, you MUST run the following commands **in order** before reporting work as done, creating commits, or opening PRs:
1. `pnpm format` — auto-fix formatting issues
2. `pnpm lint` — check for lint errors; fix any that appear
3. `pnpm types` — check for type errors; fix any that appear
Do NOT skip these steps. If any command reports errors, fix them and re-run until clean. Only then may you consider the task complete. If typing keeps failing, stop and ask the user.
### Code Style
- Fully capitalize acronyms in symbols, e.g. `graphID`, `useBackendAPI`
- Use function declarations (not arrow functions) for components/handlers
- No `dark:` Tailwind classes — the design system handles dark mode
- Use Next.js `<Link>` for internal navigation — never raw `<a>` tags
- No `any` types unless the value genuinely can be anything
- No linter suppressors (`// @ts-ignore`, `// eslint-disable`) — fix the actual issue
- **File length** — keep files under ~200 lines; extract sub-components or hooks into their own files when a file grows beyond this
- **Function/component length** — keep render functions and hooks under ~50 lines; extract named helpers or sub-components when they grow longer
## Architecture
- **Framework**: Next.js 15 App Router (client-first approach)
- **Data Fetching**: Type-safe generated API hooks via Orval + React Query
- **State Management**: React Query for server state, co-located UI state in components/hooks
- **Component Structure**: Separate render logic (`.tsx`) from business logic (`use*.ts` hooks)
- **Workflow Builder**: Visual graph editor using @xyflow/react
- **UI Components**: shadcn/ui (Radix UI primitives) with Tailwind CSS styling
- **Icons**: Phosphor Icons only
- **Feature Flags**: LaunchDarkly integration
- **Error Handling**: ErrorCard for render errors, toast for mutations, Sentry for exceptions
- **Testing**: Playwright for E2E, Storybook for component development
## Environment Configuration
`.env.default` (defaults) → `.env` (user overrides)
## Feature Development
See @CONTRIBUTING.md for complete patterns. Quick reference:
1. **Pages**: Create in `src/app/(platform)/feature-name/page.tsx`
- Extract component logic into custom hooks grouped by concern, not by component. Each hook should represent a cohesive domain of functionality (e.g., useSearch, useFilters, usePagination) rather than bundling all state into one useComponentState hook.
- Put each hook in its own `.ts` file
- Put sub-components in local `components/` folder
- Component props should be `type Props = { ... }` (not exported) unless it needs to be used outside the component
2. **Components**: Structure as `ComponentName/ComponentName.tsx` + `useComponentName.ts` + `helpers.ts`
- Use design system components from `src/components/` (atoms, molecules, organisms)
- Never use `src/components/__legacy__/*`
3. **Data fetching**: Use generated API hooks from `@/app/api/__generated__/endpoints/`
- Regenerate with `pnpm generate:api`
- Pattern: `use{Method}{Version}{OperationName}`
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
5. **Testing**: Add Storybook stories for new components, Playwright for E2E. When fixing a bug, write a failing Playwright test first (use `.fixme` annotation), implement the fix, then remove the annotation.
6. **Code conventions**:
- Use function declarations (not arrow functions) for components/handlers
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
- Do not type hook returns, let Typescript infer as much as possible
- Never type with `any` unless a variable/attribute can ACTUALLY be of any type
- avoid index and barrel files

View File

@@ -1,93 +1 @@
# CLAUDE.md - Frontend
This file provides guidance to Claude Code when working with the frontend.
## Essential Commands
```bash
# Install dependencies
pnpm i
# Generate API client from OpenAPI spec
pnpm generate:api
# Start development server
pnpm dev
# Run E2E tests
pnpm test
# Run Storybook for component development
pnpm storybook
# Build production
pnpm build
# Format and lint
pnpm format
# Type checking
pnpm types
```
### Pre-completion Checks (MANDATORY)
After making **any** code changes in the frontend, you MUST run the following commands **in order** before reporting work as done, creating commits, or opening PRs:
1. `pnpm format` — auto-fix formatting issues
2. `pnpm lint` — check for lint errors; fix any that appear
3. `pnpm types` — check for type errors; fix any that appear
Do NOT skip these steps. If any command reports errors, fix them and re-run until clean. Only then may you consider the task complete. If typing keeps failing, stop and ask the user.
### Code Style
- Fully capitalize acronyms in symbols, e.g. `graphID`, `useBackendAPI`
- Use function declarations (not arrow functions) for components/handlers
- No `dark:` Tailwind classes — the design system handles dark mode
- Use Next.js `<Link>` for internal navigation — never raw `<a>` tags
- No `any` types unless the value genuinely can be anything
- No linter suppressors (`// @ts-ignore`, `// eslint-disable`) — fix the actual issue
- **File length** — keep files under ~200 lines; extract sub-components or hooks into their own files when a file grows beyond this
- **Function/component length** — keep render functions and hooks under ~50 lines; extract named helpers or sub-components when they grow longer
## Architecture
- **Framework**: Next.js 15 App Router (client-first approach)
- **Data Fetching**: Type-safe generated API hooks via Orval + React Query
- **State Management**: React Query for server state, co-located UI state in components/hooks
- **Component Structure**: Separate render logic (`.tsx`) from business logic (`use*.ts` hooks)
- **Workflow Builder**: Visual graph editor using @xyflow/react
- **UI Components**: shadcn/ui (Radix UI primitives) with Tailwind CSS styling
- **Icons**: Phosphor Icons only
- **Feature Flags**: LaunchDarkly integration
- **Error Handling**: ErrorCard for render errors, toast for mutations, Sentry for exceptions
- **Testing**: Playwright for E2E, Storybook for component development
## Environment Configuration
`.env.default` (defaults) → `.env` (user overrides)
## Feature Development
See @CONTRIBUTING.md for complete patterns. Quick reference:
1. **Pages**: Create in `src/app/(platform)/feature-name/page.tsx`
- Extract component logic into custom hooks grouped by concern, not by component. Each hook should represent a cohesive domain of functionality (e.g., useSearch, useFilters, usePagination) rather than bundling all state into one useComponentState hook.
- Put each hook in its own `.ts` file
- Put sub-components in local `components/` folder
- Component props should be `type Props = { ... }` (not exported) unless it needs to be used outside the component
2. **Components**: Structure as `ComponentName/ComponentName.tsx` + `useComponentName.ts` + `helpers.ts`
- Use design system components from `src/components/` (atoms, molecules, organisms)
- Never use `src/components/__legacy__/*`
3. **Data fetching**: Use generated API hooks from `@/app/api/__generated__/endpoints/`
- Regenerate with `pnpm generate:api`
- Pattern: `use{Method}{Version}{OperationName}`
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
5. **Testing**: Add Storybook stories for new components, Playwright for E2E. When fixing a bug, write a failing Playwright test first (use `.fixme` annotation), implement the fix, then remove the annotation.
6. **Code conventions**:
- Use function declarations (not arrow functions) for components/handlers
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
- Do not type hook returns, let Typescript infer as much as possible
- Never type with `any` unless a variable/attribute can ACTUALLY be of any type
- avoid index and barrel files
@AGENTS.md

View File

@@ -5,8 +5,12 @@ import { Button } from "@/components/atoms/Button/Button";
import type { UserRateLimitResponse } from "@/app/api/__generated__/models/userRateLimitResponse";
import { UsageBar } from "../../components/UsageBar";
/** Extend generated type with optional fields returned by the backend
* but not yet present in the generated OpenAPI schema on this branch. */
type RateLimitData = UserRateLimitResponse & { user_email?: string | null };
interface Props {
data: UserRateLimitResponse;
data: RateLimitData;
onReset: (resetWeekly: boolean) => Promise<void>;
/** Override the outer container classes (default: bordered card). */
className?: string;

View File

@@ -49,17 +49,23 @@ export function useRateLimitManager() {
setRateLimitData(null);
try {
// The backend accepts either user_id or email, but the generated type
// only knows about user_id — cast to satisfy the compiler until the
// OpenAPI spec on this branch is updated.
const params = looksLikeEmail(trimmed)
? { email: trimmed }
? ({ email: trimmed } as unknown as { user_id: string })
: { user_id: trimmed };
const response = await getV2GetUserRateLimit(params);
if (response.status !== 200) {
throw new Error("Failed to fetch rate limit");
}
setRateLimitData(response.data);
const data = response.data as typeof response.data & {
user_email?: string | null;
};
setSelectedUser({
user_id: response.data.user_id,
user_email: response.data.user_email ?? response.data.user_id,
user_id: data.user_id,
user_email: data.user_email ?? data.user_id,
});
} catch (error) {
console.error("Error fetching rate limit:", error);

View File

@@ -39,39 +39,49 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
return outputNodes
.map((node) => {
const executionResults = node.data.nodeExecutionResults || [];
const latestResult =
executionResults.length > 0
? executionResults[executionResults.length - 1]
: undefined;
const outputData = latestResult?.output_data?.output;
const renderer = globalRegistry.getRenderer(outputData);
const items = executionResults
.filter((result) => result.output_data?.output !== undefined)
.map((result) => {
const outputData = result.output_data!.output;
const renderer = globalRegistry.getRenderer(outputData);
return {
nodeExecID: result.node_exec_id,
value: outputData,
renderer,
};
})
.filter(
(
item,
): item is typeof item & {
renderer: NonNullable<typeof item.renderer>;
} => item.renderer !== null,
);
if (items.length === 0) return null;
return {
nodeID: node.id,
metadata: {
name: node.data.hardcodedValues?.name || "Output",
description:
node.data.hardcodedValues?.description || "Output from the agent",
},
value: outputData ?? "No output yet",
renderer,
items,
};
})
.filter(
(
output,
): output is typeof output & {
renderer: NonNullable<typeof output.renderer>;
} => output.renderer !== null,
);
.filter((group): group is NonNullable<typeof group> => group !== null);
}, [nodes]);
const actionItems = useMemo(() => {
return outputs.map((output) => ({
value: output.value,
metadata: {},
renderer: output.renderer,
}));
return outputs.flatMap((group) =>
group.items.map((item) => ({
value: item.value,
metadata: group.metadata,
renderer: item.renderer,
})),
);
}, [outputs]);
return (
@@ -116,24 +126,27 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
<ScrollArea className="h-full overflow-auto pr-4">
<div className="space-y-6">
{outputs && outputs.length > 0 ? (
outputs.map((output, i) => (
<div key={i} className="space-y-2">
outputs.map((group) => (
<div key={group.nodeID} className="space-y-2">
<div>
<Label className="text-base font-semibold">
{output.metadata.name || "Unnamed Output"}
{group.metadata.name || "Unnamed Output"}
</Label>
{output.metadata.description && (
{group.metadata.description && (
<Label className="mt-1 block text-sm text-gray-600">
{output.metadata.description}
{group.metadata.description}
</Label>
)}
</div>
<OutputItem
value={output.value}
metadata={{}}
renderer={output.renderer}
/>
{group.items.map((item) => (
<OutputItem
key={item.nodeExecID}
value={item.value}
metadata={group.metadata}
renderer={item.renderer}
/>
))}
</div>
))
) : (

View File

@@ -33,6 +33,12 @@ export const useRunGraph = () => {
const clearAllNodeErrors = useNodeStore(
useShallow((state) => state.clearAllNodeErrors),
);
const cleanNodesStatuses = useNodeStore(
useShallow((state) => state.cleanNodesStatuses),
);
const clearAllNodeExecutionResults = useNodeStore(
useShallow((state) => state.clearAllNodeExecutionResults),
);
// Tutorial integration - force open dialog when tutorial requests it
const forceOpenRunInputDialog = useTutorialStore(
@@ -137,6 +143,9 @@ export const useRunGraph = () => {
if (!dryRun && (hasInputs() || hasCredentials())) {
setOpenRunInputDialog(true);
} else {
// Clear stale results so the UI shows fresh output from this execution
clearAllNodeExecutionResults();
cleanNodesStatuses();
// Optimistically set running state immediately for responsive UI
setIsGraphRunning(true);
await executeGraph({

View File

@@ -10,9 +10,12 @@ import { NodeExecutionResult } from "@/app/api/__generated__/models/nodeExecutio
import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus";
import { useGraphStore } from "../../../stores/graphStore";
import { useEdgeStore } from "../../../stores/edgeStore";
import { useQueryClient } from "@tanstack/react-query";
import { getGetV1GetExecutionDetailsQueryKey } from "@/app/api/__generated__/endpoints/graphs/graphs";
export const useFlowRealtime = () => {
const api = useBackendAPI();
const queryClient = useQueryClient();
const updateNodeExecutionResult = useNodeStore(
useShallow((state) => state.updateNodeExecutionResult),
);
@@ -71,6 +74,16 @@ export const useFlowRealtime = () => {
console.debug(
`Subscribed to updates for execution #${flowExecutionID}`,
);
// Refetch execution details to catch any events that were
// published before the WebSocket subscription was established.
// This closes the race-condition window for fast-completing
// executions like dry-runs / simulations.
void queryClient.invalidateQueries({
queryKey: getGetV1GetExecutionDetailsQueryKey(
flowID!,
flowExecutionID,
),
});
})
.catch((error) =>
console.error(
@@ -87,7 +100,7 @@ export const useFlowRealtime = () => {
deregisterGraphExecutionStatusEvent();
resetEdgeBeads();
};
}, [api, flowExecutionID, resetEdgeBeads]);
}, [api, flowExecutionID, resetEdgeBeads, queryClient, flowID]);
return {};
};

View File

@@ -99,7 +99,19 @@ export function isRunBlockReviewRequiredOutput(
export function isRunBlockErrorOutput(
output: RunBlockToolOutput,
): output is ErrorResponse {
return output.type === ResponseType.error || "error" in output;
// Only match actual error responses (type=error), not block outputs that
// happen to have an "error" key in their outputs dict. The old
// `"error" in output` check was too broad and caused BlockOutputResponse
// to be mis-identified as errors, showing dry-run results as failed.
if (output.type === ResponseType.error) return true;
// Fallback for untyped payloads: match only if "error" exists at the top
// level AND there is no "block_id" (which distinguishes BlockOutputResponse
// from ErrorResponse). Note: `type` is optional in both interfaces, so
// correctness here depends on `block_id` presence (always set on
// BlockOutputResponse), not on `type` presence.
if (!("type" in output) && "error" in output && !("block_id" in output))
return true;
return false;
}
function parseOutput(output: unknown): RunBlockToolOutput | null {
@@ -122,7 +134,9 @@ function parseOutput(output: unknown): RunBlockToolOutput | null {
if ("block_id" in output) return output as BlockOutputResponse;
if ("block" in output) return output as BlockDetailsResponse;
if ("setup_info" in output) return output as SetupRequirementsResponse;
if ("error" in output || "details" in output)
// Only match error responses that have an "error" key but NOT "block_id"
// (which would indicate a BlockOutputResponse, not an error).
if (("error" in output || "details" in output) && !("block_id" in output))
return output as ErrorResponse;
}
return null;

View File

@@ -95,7 +95,8 @@ export function useChatSession() {
async function createSession() {
if (sessionId) return sessionId;
try {
const response = await createSessionMutation();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const response = await (createSessionMutation as any)({ data: null });
if (response.status !== 200 || !response.data?.id) {
const error = new Error("Failed to create session");
Sentry.captureException(error, {

View File

@@ -19,6 +19,7 @@ import {
const RECONNECT_BASE_DELAY_MS = 1_000;
const RECONNECT_MAX_ATTEMPTS = 3;
const STREAM_TIMEOUT_MS = 60_000;
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
@@ -102,6 +103,11 @@ export function useCopilotStream({
// Set when the user explicitly clicks stop — prevents onError from
// triggering a reconnect cycle for the resulting AbortError.
const isUserStoppingRef = useRef(false);
// Timer that fires when no SSE events arrive for STREAM_TIMEOUT_MS during
// an active stream — auto-cancels the stream to avoid "Reasoning..." forever.
const streamTimeoutRef = useRef<ReturnType<typeof setTimeout>>();
// Ref to the latest stop() so the timeout callback never uses a stale closure.
const stopRef = useRef<() => void>(() => {});
// Set when all reconnect attempts are exhausted — prevents hasActiveStream
// from keeping the UI blocked forever when the backend is slow to clear it.
// Must be state (not ref) so that setting it triggers a re-render and
@@ -245,8 +251,12 @@ export function useCopilotStream({
// Wrap AI SDK's stop() to also cancel the backend executor task.
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
// the cancel API to actually stop the executor and wait for confirmation.
// Also kept in stopRef so the stream-timeout callback always calls the
// latest version without needing it in the effect dependency array.
async function stop() {
isUserStoppingRef.current = true;
clearTimeout(streamTimeoutRef.current);
streamTimeoutRef.current = undefined;
sdkStop();
// Resolve pending tool calls and inject a cancellation marker so the UI
// shows "You manually stopped this chat" immediately (the backend writes
@@ -295,6 +305,7 @@ export function useCopilotStream({
});
}
}
stopRef.current = stop;
// Keep a ref to sessionId so the async wake handler can detect staleness.
const sessionIdRef = useRef(sessionId);
@@ -375,6 +386,8 @@ export function useCopilotStream({
useEffect(() => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
clearTimeout(streamTimeoutRef.current);
streamTimeoutRef.current = undefined;
reconnectAttemptsRef.current = 0;
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
@@ -387,6 +400,8 @@ export function useCopilotStream({
return () => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
clearTimeout(streamTimeoutRef.current);
streamTimeoutRef.current = undefined;
};
}, [sessionId]);
@@ -468,6 +483,41 @@ export function useCopilotStream({
}
}, [hasActiveStream]);
// Stream timeout guard: if no SSE events arrive for STREAM_TIMEOUT_MS while
// the stream is active, auto-cancel to avoid the UI stuck in "Reasoning..."
// indefinitely (e.g. when the SSE connection dies silently without a
// disconnect event).
useEffect(() => {
// rawMessages is intentionally in the dependency array: each SSE event
// updates rawMessages, which re-runs this effect and resets the timer.
// Referencing its length here satisfies the exhaustive-deps rule.
void rawMessages.length;
const isActive = status === "streaming" || status === "submitted";
if (!isActive) {
clearTimeout(streamTimeoutRef.current);
streamTimeoutRef.current = undefined;
return;
}
clearTimeout(streamTimeoutRef.current);
streamTimeoutRef.current = setTimeout(() => {
streamTimeoutRef.current = undefined;
toast({
title: "Connection lost",
description:
"No response received — please try sending your message again.",
variant: "destructive",
});
stopRef.current();
}, STREAM_TIMEOUT_MS);
return () => {
clearTimeout(streamTimeoutRef.current);
streamTimeoutRef.current = undefined;
};
}, [status, rawMessages]);
// True while reconnecting or backend has active stream but we haven't connected yet.
// Suppressed when the user explicitly stopped or when all reconnect attempts
// are exhausted — the backend may be slow to clear active_stream but the UI

View File

@@ -1059,9 +1059,22 @@
"post": {
"tags": ["v2", "chat", "chat"],
"summary": "Create Session",
"description": "Create a new chat session.\n\nInitiates a new chat session for the authenticated user.\n\nArgs:\n user_id: The authenticated user ID parsed from the JWT (required).\n\nReturns:\n CreateSessionResponse: Details of the created session.",
"description": "Create a new chat session.\n\nInitiates a new chat session for the authenticated user.\n\nArgs:\n user_id: The authenticated user ID parsed from the JWT (required).\n request: Optional request body. When provided, ``dry_run=True``\n forces run_block and run_agent calls to use dry-run simulation.\n\nReturns:\n CreateSessionResponse: Details of the created session.",
"operationId": "postV2CreateSession",
"security": [{ "HTTPBearerJWT": [] }],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{ "$ref": "#/components/schemas/CreateSessionRequest" },
{ "type": "null" }
],
"title": "Request"
}
}
}
},
"responses": {
"200": {
"description": "Successful Response",
@@ -1075,6 +1088,14 @@
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
@@ -8385,6 +8406,14 @@
"required": ["query", "conversation_history", "message_id"],
"title": "ChatRequest"
},
"ChatSessionMetadata": {
"properties": {
"dry_run": { "type": "boolean", "title": "Dry Run", "default": false }
},
"type": "object",
"title": "ChatSessionMetadata",
"description": "Typed metadata stored in the ``metadata`` JSON column of ChatSession.\n\nAdd new session-level flags here instead of adding DB columns —\nno migration required for new fields as long as a default is provided."
},
"ClarificationNeededResponse": {
"properties": {
"type": {
@@ -8514,6 +8543,15 @@
"required": ["graph"],
"title": "CreateGraph"
},
"CreateSessionRequest": {
"properties": {
"dry_run": { "type": "boolean", "title": "Dry Run", "default": false }
},
"additionalProperties": false,
"type": "object",
"title": "CreateSessionRequest",
"description": "Request model for creating a new chat session.\n\n``dry_run`` is a **top-level** field — do not nest it inside ``metadata``.\nExtra/unknown fields are rejected (422) to prevent silent mis-use."
},
"CreateSessionResponse": {
"properties": {
"id": { "type": "string", "title": "Id" },
@@ -8521,6 +8559,10 @@
"user_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "User Id"
},
"metadata": {
"$ref": "#/components/schemas/ChatSessionMetadata",
"default": { "dry_run": false }
}
},
"type": "object",
@@ -12139,6 +12181,10 @@
"type": "integer",
"title": "Total Completion Tokens",
"default": 0
},
"metadata": {
"$ref": "#/components/schemas/ChatSessionMetadata",
"default": { "dry_run": false }
}
},
"type": "object",

View File

@@ -52,7 +52,6 @@ export function LaunchDarklyProvider({ children }: { children: ReactNode }) {
timeout={LAUNCHDARKLY_INIT_TIMEOUT_MS}
reactOptions={{ useCamelCaseFlagKeys: false }}
options={{
bootstrap: "localStorage",
inspectors: [Sentry.buildLaunchDarklyFlagUsedHandler()],
}}
>

View File

@@ -0,0 +1,220 @@
# Frontend Testing Rules 🧪
## Testing Types Overview
| Type | Tool | Speed | Purpose |
| --------------- | --------------------- | --------------- | -------------------------------- |
| **E2E** | Playwright | Slow (~5s/test) | Real browser, full user journeys |
| **Integration** | Vitest + RTL | Fast (~100ms) | Component + mocked API |
| **Unit** | Vitest + RTL | Fastest (~10ms) | Individual functions/components |
| **Visual** | Storybook + Chromatic | N/A | UI appearance, design system |
---
## When to Use Each
### ✅ E2E Tests (Playwright)
**Use for:** Critical user journeys that MUST work in a real browser.
- Authentication flows (login, signup, logout)
- Payment or sensitive transactions
- Flows requiring real browser APIs (clipboard, downloads)
- Cross-page navigation that must work end-to-end
**Location:** `src/tests/*.spec.ts` (centralized, as there will be fewer of them)
### ✅ Integration Tests (Vitest + RTL)
**Use for:** Testing components with their dependencies (API calls, state).
- Page-level behavior with mocked API responses
- Components that fetch data
- User interactions that trigger API calls
- Feature flows within a single page
**Location:** Place tests in a `__tests__` folder next to the component:
```
ComponentName/
__tests__/
main.test.tsx
some-flow.test.tsx
ComponentName.tsx
useComponentName.ts
```
**Start at page level:** Initially write integration tests at the "page" level. No need to write them for every small component.
```
/library/
__tests__/
main.test.tsx
searching-agents.test.tsx
agents-pagination.test.tsx
page.tsx
useLibraryPage.ts
```
Start with a `main.test.tsx` file and split into smaller files as it grows.
**What integration tests should do:**
1. Render a page or complex modal (e.g., `AgentPublishModal`)
2. Mock API requests via MSW
3. Assert UI scenarios via Testing Library
```tsx
// Example: Test page renders data from API
import { server } from "@/mocks/mock-server";
import { getDeleteV2DeleteStoreSubmissionMockHandler422 } from "@/app/api/__generated__/endpoints/store/store.msw";
test("shows error when submission fails", async () => {
// Override default handler to return error status
server.use(getDeleteV2DeleteStoreSubmissionMockHandler422());
render(<MarketplacePage />);
await screen.findByText("Featured Agents");
// ... assert error UI
});
```
**Tip:** Use `findBy...` methods most of the time—they wait for elements to appear, so async code won't cause flaky tests. The regular `getBy...` methods don't wait and error immediately.
### ✅ Unit Tests (Vitest + RTL)
**Use for:** Testing isolated components and utility functions.
- Pure utility functions (`lib/utils.ts`)
- Component rendering with various props
- Component state changes
- Custom hooks
**Location:** Co-located with the file: `Component.test.tsx` next to `Component.tsx`
```tsx
// Example: Test component renders correctly
render(<AgentCard title="My Agent" />);
expect(screen.getByText("My Agent")).toBeInTheDocument();
```
### ✅ Storybook Tests (Visual)
**Use for:** Design system, visual appearance, component documentation.
- Atoms (Button, Input, Badge)
- Molecules (Dialog, Card)
- Visual states (hover, disabled, loading)
- Responsive layouts
**Location:** Co-located: `Component.stories.tsx` next to `Component.tsx`
---
## Decision Flowchart
```
Does it need a REAL browser/backend?
├─ YES → E2E (Playwright)
└─ NO
└─ Does it involve API calls or complex state?
├─ YES → Integration (Vitest + RTL)
└─ NO
└─ Is it about visual appearance?
├─ YES → Storybook
└─ NO → Unit (Vitest + RTL)
```
---
## What NOT to Test
❌ Third-party library internals (Radix UI, React Query)
❌ CSS styling details (use Storybook)
❌ Simple prop-passing components with no logic
❌ TypeScript types
---
## File Organization
```
src/
├── components/
│ └── atoms/
│ └── Button/
│ ├── Button.tsx
│ ├── Button.test.tsx # Unit test
│ └── Button.stories.tsx # Visual test
├── app/
│ └── (platform)/
│ └── marketplace/
│ └── components/
│ └── MainMarketplacePage/
│ ├── __tests__/
│ │ ├── main.test.tsx # Integration test
│ │ └── search-agents.test.tsx # Integration test
│ ├── MainMarketplacePage.tsx
│ └── useMainMarketplacePage.ts
├── lib/
│ ├── utils.ts
│ └── utils.test.ts # Unit test
├── mocks/
│ ├── mock-handlers.ts # MSW handlers (auto-generated via Orval)
│ └── mock-server.ts # MSW server setup
└── tests/
├── integrations/
│ ├── test-utils.tsx # Testing utilities
│ └── vitest.setup.tsx # Integration test setup
└── *.spec.ts # E2E tests (Playwright) - centralized
```
---
## Priority Matrix
| Component Type | Test Priority | Recommended Test |
| ------------------- | ------------- | ---------------- |
| Pages/Features | **Highest** | Integration |
| Custom Hooks | High | Unit |
| Utility Functions | High | Unit |
| Organisms (complex) | High | Integration |
| Molecules | Medium | Unit + Storybook |
| Atoms | Medium | Storybook only\* |
\*Atoms are typically simple enough that Storybook visual tests suffice.
---
## MSW Mocking
API mocking is handled via MSW (Mock Service Worker). Handlers are auto-generated by Orval from the OpenAPI schema.
**Default behavior:** All client-side requests are intercepted and return 200 status with faker-generated data.
**Override for specific tests:** Use generated error handlers to test non-OK status scenarios:
```tsx
import { server } from "@/mocks/mock-server";
import { getDeleteV2DeleteStoreSubmissionMockHandler422 } from "@/app/api/__generated__/endpoints/store/store.msw";
test("shows error when deletion fails", async () => {
server.use(getDeleteV2DeleteStoreSubmissionMockHandler422());
render(<MyComponent />);
// ... assert error UI
});
```
**Generated handlers location:** `src/app/api/__generated__/endpoints/*/` - each endpoint has handlers for different status codes.
---
## Golden Rules
1. **Test behavior, not implementation** - Query by role/text, not class names
2. **One assertion per concept** - Tests should be focused
3. **Mock at boundaries** - Mock API calls, not internal functions
4. **Co-locate integration tests** - Keep `__tests__/` folder next to the component
5. **E2E is expensive** - Only for critical happy paths; prefer integration tests
6. **AI agents are good at writing integration tests** - Start with these when adding test coverage

View File

@@ -1,220 +1 @@
# Frontend Testing Rules 🧪
## Testing Types Overview
| Type | Tool | Speed | Purpose |
| --------------- | --------------------- | --------------- | -------------------------------- |
| **E2E** | Playwright | Slow (~5s/test) | Real browser, full user journeys |
| **Integration** | Vitest + RTL | Fast (~100ms) | Component + mocked API |
| **Unit** | Vitest + RTL | Fastest (~10ms) | Individual functions/components |
| **Visual** | Storybook + Chromatic | N/A | UI appearance, design system |
---
## When to Use Each
### ✅ E2E Tests (Playwright)
**Use for:** Critical user journeys that MUST work in a real browser.
- Authentication flows (login, signup, logout)
- Payment or sensitive transactions
- Flows requiring real browser APIs (clipboard, downloads)
- Cross-page navigation that must work end-to-end
**Location:** `src/tests/*.spec.ts` (centralized, as there will be fewer of them)
### ✅ Integration Tests (Vitest + RTL)
**Use for:** Testing components with their dependencies (API calls, state).
- Page-level behavior with mocked API responses
- Components that fetch data
- User interactions that trigger API calls
- Feature flows within a single page
**Location:** Place tests in a `__tests__` folder next to the component:
```
ComponentName/
__tests__/
main.test.tsx
some-flow.test.tsx
ComponentName.tsx
useComponentName.ts
```
**Start at page level:** Initially write integration tests at the "page" level. No need to write them for every small component.
```
/library/
__tests__/
main.test.tsx
searching-agents.test.tsx
agents-pagination.test.tsx
page.tsx
useLibraryPage.ts
```
Start with a `main.test.tsx` file and split into smaller files as it grows.
**What integration tests should do:**
1. Render a page or complex modal (e.g., `AgentPublishModal`)
2. Mock API requests via MSW
3. Assert UI scenarios via Testing Library
```tsx
// Example: Test page renders data from API
import { server } from "@/mocks/mock-server";
import { getDeleteV2DeleteStoreSubmissionMockHandler422 } from "@/app/api/__generated__/endpoints/store/store.msw";
test("shows error when submission fails", async () => {
// Override default handler to return error status
server.use(getDeleteV2DeleteStoreSubmissionMockHandler422());
render(<MarketplacePage />);
await screen.findByText("Featured Agents");
// ... assert error UI
});
```
**Tip:** Use `findBy...` methods most of the time—they wait for elements to appear, so async code won't cause flaky tests. The regular `getBy...` methods don't wait and error immediately.
### ✅ Unit Tests (Vitest + RTL)
**Use for:** Testing isolated components and utility functions.
- Pure utility functions (`lib/utils.ts`)
- Component rendering with various props
- Component state changes
- Custom hooks
**Location:** Co-located with the file: `Component.test.tsx` next to `Component.tsx`
```tsx
// Example: Test component renders correctly
render(<AgentCard title="My Agent" />);
expect(screen.getByText("My Agent")).toBeInTheDocument();
```
### ✅ Storybook Tests (Visual)
**Use for:** Design system, visual appearance, component documentation.
- Atoms (Button, Input, Badge)
- Molecules (Dialog, Card)
- Visual states (hover, disabled, loading)
- Responsive layouts
**Location:** Co-located: `Component.stories.tsx` next to `Component.tsx`
---
## Decision Flowchart
```
Does it need a REAL browser/backend?
├─ YES → E2E (Playwright)
└─ NO
└─ Does it involve API calls or complex state?
├─ YES → Integration (Vitest + RTL)
└─ NO
└─ Is it about visual appearance?
├─ YES → Storybook
└─ NO → Unit (Vitest + RTL)
```
---
## What NOT to Test
❌ Third-party library internals (Radix UI, React Query)
❌ CSS styling details (use Storybook)
❌ Simple prop-passing components with no logic
❌ TypeScript types
---
## File Organization
```
src/
├── components/
│ └── atoms/
│ └── Button/
│ ├── Button.tsx
│ ├── Button.test.tsx # Unit test
│ └── Button.stories.tsx # Visual test
├── app/
│ └── (platform)/
│ └── marketplace/
│ └── components/
│ └── MainMarketplacePage/
│ ├── __tests__/
│ │ ├── main.test.tsx # Integration test
│ │ └── search-agents.test.tsx # Integration test
│ ├── MainMarketplacePage.tsx
│ └── useMainMarketplacePage.ts
├── lib/
│ ├── utils.ts
│ └── utils.test.ts # Unit test
├── mocks/
│ ├── mock-handlers.ts # MSW handlers (auto-generated via Orval)
│ └── mock-server.ts # MSW server setup
└── tests/
├── integrations/
│ ├── test-utils.tsx # Testing utilities
│ └── vitest.setup.tsx # Integration test setup
└── *.spec.ts # E2E tests (Playwright) - centralized
```
---
## Priority Matrix
| Component Type | Test Priority | Recommended Test |
| ------------------- | ------------- | ---------------- |
| Pages/Features | **Highest** | Integration |
| Custom Hooks | High | Unit |
| Utility Functions | High | Unit |
| Organisms (complex) | High | Integration |
| Molecules | Medium | Unit + Storybook |
| Atoms | Medium | Storybook only\* |
\*Atoms are typically simple enough that Storybook visual tests suffice.
---
## MSW Mocking
API mocking is handled via MSW (Mock Service Worker). Handlers are auto-generated by Orval from the OpenAPI schema.
**Default behavior:** All client-side requests are intercepted and return 200 status with faker-generated data.
**Override for specific tests:** Use generated error handlers to test non-OK status scenarios:
```tsx
import { server } from "@/mocks/mock-server";
import { getDeleteV2DeleteStoreSubmissionMockHandler422 } from "@/app/api/__generated__/endpoints/store/store.msw";
test("shows error when deletion fails", async () => {
server.use(getDeleteV2DeleteStoreSubmissionMockHandler422());
render(<MyComponent />);
// ... assert error UI
});
```
**Generated handlers location:** `src/app/api/__generated__/endpoints/*/` - each endpoint has handlers for different status codes.
---
## Golden Rules
1. **Test behavior, not implementation** - Query by role/text, not class names
2. **One assertion per concept** - Tests should be focused
3. **Mock at boundaries** - Mock API calls, not internal functions
4. **Co-locate integration tests** - Keep `__tests__/` folder next to the component
5. **E2E is expensive** - Only for critical happy paths; prefer integration tests
6. **AI agents are good at writing integration tests** - Start with these when adding test coverage
@AGENTS.md

44
docs/AGENTS.md Normal file
View File

@@ -0,0 +1,44 @@
# Documentation Guidelines
## Block Documentation Manual Sections
When updating manual sections (`<!-- MANUAL: ... -->`) in block documentation files (e.g., `docs/integrations/basic.md`), follow these formats:
### How It Works Section
Provide a technical explanation of how the block functions:
- Describe the processing logic in 1-2 paragraphs
- Mention any validation, error handling, or edge cases
- Use code examples with backticks when helpful (e.g., `[[1, 2], [3, 4]]` becomes `[1, 2, 3, 4]`)
Example:
```markdown
<!-- MANUAL: how_it_works -->
The block iterates through each list in the input and extends a result list with all elements from each one. It processes lists in order, so `[[1, 2], [3, 4]]` becomes `[1, 2, 3, 4]`.
The block includes validation to ensure each item is actually a list. If a non-list value is encountered, the block outputs an error message instead of proceeding.
<!-- END MANUAL -->
```
### Use Case Section
Provide 3 practical use cases in this format:
- **Bold Heading**: Short one-sentence description
Example:
```markdown
<!-- MANUAL: use_case -->
**Paginated API Merging**: Combine results from multiple API pages into a single list for batch processing or display.
**Parallel Task Aggregation**: Merge outputs from parallel workflow branches that each produce a list of results.
**Multi-Source Data Collection**: Combine data collected from different sources (like multiple RSS feeds or API endpoints) into one unified list.
<!-- END MANUAL -->
```
### Style Guidelines
- Keep descriptions concise and action-oriented
- Focus on practical, real-world scenarios
- Use consistent terminology with other blocks
- Avoid overly technical jargon unless necessary

View File

@@ -1,44 +1 @@
# Documentation Guidelines
## Block Documentation Manual Sections
When updating manual sections (`<!-- MANUAL: ... -->`) in block documentation files (e.g., `docs/integrations/basic.md`), follow these formats:
### How It Works Section
Provide a technical explanation of how the block functions:
- Describe the processing logic in 1-2 paragraphs
- Mention any validation, error handling, or edge cases
- Use code examples with backticks when helpful (e.g., `[[1, 2], [3, 4]]` becomes `[1, 2, 3, 4]`)
Example:
```markdown
<!-- MANUAL: how_it_works -->
The block iterates through each list in the input and extends a result list with all elements from each one. It processes lists in order, so `[[1, 2], [3, 4]]` becomes `[1, 2, 3, 4]`.
The block includes validation to ensure each item is actually a list. If a non-list value is encountered, the block outputs an error message instead of proceeding.
<!-- END MANUAL -->
```
### Use Case Section
Provide 3 practical use cases in this format:
- **Bold Heading**: Short one-sentence description
Example:
```markdown
<!-- MANUAL: use_case -->
**Paginated API Merging**: Combine results from multiple API pages into a single list for batch processing or display.
**Parallel Task Aggregation**: Merge outputs from parallel workflow branches that each produce a list of results.
**Multi-Source Data Collection**: Combine data collected from different sources (like multiple RSS feeds or API endpoints) into one unified list.
<!-- END MANUAL -->
```
### Style Guidelines
- Keep descriptions concise and action-oriented
- Focus on practical, real-world scenarios
- Use consistent terminology with other blocks
- Avoid overly technical jargon unless necessary
@AGENTS.md

View File

@@ -62,6 +62,7 @@ Tool and block identifiers provided in `tools` and `blocks` are validated at run
| tools_exclude | Controls how the 'tools' list is interpreted. True (default): 'tools' is a deny-list — listed tools are blocked, all others are allowed. An empty 'tools' list means allow everything. False: 'tools' is an allow-list — only listed tools are permitted. | bool | No |
| blocks | Block identifiers to filter when the copilot uses run_block. Each entry can be: a block name (e.g. 'HTTP Request'), a full block UUID, or the first 8 hex characters of the UUID (e.g. 'c069dc6b'). Works with blocks_exclude. Leave empty to apply no block filter. | List[str] | No |
| blocks_exclude | Controls how the 'blocks' list is interpreted. True (default): 'blocks' is a deny-list — listed blocks are blocked, all others are allowed. An empty 'blocks' list means allow everything. False: 'blocks' is an allow-list — only listed blocks are permitted. | bool | No |
| dry_run | When enabled, run_block and run_agent tool calls in this autopilot session are forced to use dry-run simulation mode. No real API calls, side effects, or credits are consumed by those tools. Useful for testing agent wiring and previewing outputs. Only applies when creating a new session (session_id is empty). When reusing an existing session_id, the session's original dry_run setting is preserved. | bool | No |
### Outputs