Compare commits

...

19 Commits

Author SHA1 Message Date
Otto (AGPT)
e9afd9fa01 fix: cast OnboardingStep enum to text in funnel view
The completedSteps column is a platform."OnboardingStep" enum array.
UNNEST produces enum values that can't be compared directly to text
from the VALUES clause. Adding ::text cast fixes the type mismatch.
2026-03-13 11:55:37 +00:00
Zamil Majdy
ddb4f6e9de fix(analytics): address second batch of PR review comments
- user_onboarding_funnel: build complete 22-step grid with VALUES CTE
  so zero-completion steps are always present, fixing LAG comparisons
  against wrong predecessors; update docs to reflect all 22 steps
- users_activities: use COUNT(DISTINCT "id") for agent_count to avoid
  counting multiple version rows per graph; add COALESCE(..., 0) for
  agent_count, unique_agent_runs, agent_runs; update docs column list
  to include node_execution_incomplete and node_execution_review
- generate_views: update Step 3 comment to clarify NOLOGIN role needs
  WITH LOGIN PASSWORD not just WITH PASSWORD; add fail-fast validation
  for unknown --only view names with helpful error message
2026-03-12 00:47:55 +07:00
Zamil Majdy
f585d97928 fix(analytics): move new status columns to end of users_activities SELECT
CREATE OR REPLACE VIEW requires existing columns to stay in position.
Moving node_execution_incomplete and node_execution_review after
is_active_after_7d so the replacement doesn't shift existing columns.
2026-03-12 00:01:40 +07:00
Zamil Majdy
7d39234fdd fix(analytics): address PR review comments
- user_block_spending: use ->> instead of -> for JSONB field extraction
  before casting to int (avoids runtime cast errors)
- generate_views: create analytics_readonly as NOLOGIN to avoid a
  usable role with a known default password
- generate_views: percent-encode DB credentials in the URI builder so
  passwords with reserved chars (@, :, /) connect correctly
- graph_execution: remove WHERE filter on sensitive_action_safe_mode
  before DISTINCT ON so the latest LibraryAgent version always wins
  (fixes possibly_ai being sticky once any version had the flag set)
- retention_agent: use DISTINCT ON ordered by version DESC instead of
  MAX(name) so renamed agents resolve to their latest name
- retention_login_daily: add 90-day cohort_start filter to first_login
  CTE so the view matches its documented window
- user_onboarding_funnel: map the 8 missing OnboardingStep enum values
  (VISIT_COPILOT, RE_RUN_AGENT, SCHEDULE_AGENT, RUN_AGENTS, RUN_3_DAYS,
  TRIGGER_WEBHOOK, RUN_14_DAYS, RUN_AGENTS_100) to step_order 15-22
- users_activities: use updatedAt instead of createdAt for
  last_agent_save_time; add node_execution_incomplete and
  node_execution_review status columns
2026-03-11 23:48:42 +07:00
Zamil Majdy
6e9d4c4333 perf(analytics): fix fan-out in users_activities view
The original CTEs drove all joins from user_logins, causing a
O(users × executions × node_executions) fan-out that made the view
too heavy for Supabase to serve. Rewrote each CTE to aggregate its
own source table directly by userId, then LEFT JOIN the aggregates
in the final SELECT.
2026-03-11 23:39:14 +07:00
Zamil Majdy
8aad333a45 refactor(analytics): move generate_views.py to backend, add poetry run analytics-setup/analytics-views scripts 2026-03-11 16:23:29 +07:00
Zamil Majdy
856f0d980d fix(analytics): restrict analytics_readonly to analytics schema only via security_invoker=false views 2026-03-11 16:16:03 +07:00
Zamil Majdy
3c3aadd361 docs(analytics): add step-by-step quick start to generate_views.py docstring 2026-03-11 16:12:22 +07:00
Zamil Majdy
e87a693fdd feat(analytics): auto-load DB creds from backend/.env as fallback 2026-03-11 16:10:31 +07:00
Zamil Majdy
fe265c10d4 refactor(analytics): generate setup.sql via --setup flag, gitignore it 2026-03-11 16:01:52 +07:00
Zamil Majdy
5d00a94693 chore(analytics): remove auto-generated files, gitignore views.sql 2026-03-11 16:00:48 +07:00
Zamil Majdy
6e1605994d feat(analytics): add documented SQL views with generation script
Introduces an analytics/ layer that wraps production Postgres data in
safe, read-only views exposed under the analytics schema.

- 14 documented query files in queries/ (one per Looker data source)
  covering auth activities, user activity, execution metrics, onboarding
  funnel, and cohort retention (login + execution, weekly + daily)
- setup.sql — one-time schema creation and role/grant setup for the
  analytics_readonly role (auth, platform, analytics schemas)
- generate_views.py — reads queries/*.sql and applies
  CREATE OR REPLACE VIEW analytics.<name> to the database;
  supports --dry-run, --only, and --db-url flags
- views.sql — pre-generated combined reference output
- README.md — full setup, deployment, and integration guide

Looker, PostHog Data Warehouse, and Supabase MCP (for Otto) all
connect to the same analytics.* views instead of raw tables.
2026-03-11 15:36:27 +07:00
Zamil Majdy
15e3980d65 fix(frontend): buffer workspace file downloads to prevent truncation (#12349)
## Summary
- Workspace file downloads (images, CSVs, etc.) were silently truncated
(~10 KB lost from the end) when served through the Next.js proxy
- Root cause: `new NextResponse(response.body)` passes a
`ReadableStream` directly, which Next.js / Vercel silently truncates for
larger files
- Fix: fully buffer with `response.arrayBuffer()` before forwarding, and
set `Content-Length` from the actual buffer size
- Keeps the auth proxy intact — no signed URLs (which would be public
and expire, breaking chat history)

## Root cause verification
Confirmed locally on session `080f27f9-0379-4085-a67a-ee34cc40cd62`:
- Backend `write_workspace_file` logs **978,831 bytes** written
- Direct backend download (`curl
localhost:8006/api/workspace/files/.../download`): **978,831 bytes** 
- Browser download through Next.js proxy: **truncated** 

## Why not signed URLs?
- Signed URLs are effectively public — anyone with the link can download
the file (privacy concern)
- Signed URLs expire, but chat history persists — reopening a
conversation later would show broken downloads
- Buffering is fine: workspace files are capped at 100 MB, Vercel
function memory is 1 GB

## Related
- Discord thread: `#Truncated File Bug` channel
- Related PR #12319 (signed URL approach) — this fix is simpler and
preserves auth

## Test plan
- [ ] Download a workspace file (CSV, PNG, any type) through the copilot
UI
- [ ] Verify downloaded file size matches the original
- [ ] Verify PNGs open correctly and CSVs have all rows

cc @Swiftyos @uberdot @AdarshRawat1
2026-03-10 18:23:51 +00:00
Zamil Majdy
fe9eb2564b feat(copilot): HITL review for sensitive block execution (#12356)
## Summary
- Integrates existing Human-In-The-Loop (HITL) review infrastructure
into CoPilot's direct block execution (`run_block`) for blocks marked
with `is_sensitive_action=True`
- Removes the `PendingHumanReview → AgentGraphExecution` FK constraint
to support synthetic CoPilot session IDs (migration included)
- Adds `ReviewRequiredResponse` model + frontend `ReviewRequiredCard`
component to surface review status in the chat UI
- Auto-approval works within a CoPilot session: once a block is
approved, subsequent executions of the same block in the same session
are auto-approved (using `copilot-session-{session_id}` as
`graph_exec_id` and `copilot-node-{block_id}` as `node_id`)

## Test plan
- [x] All 11 `run_block_test.py` tests pass (3 new sensitive action
tests)
- [ ] Manual: Execute a block with `is_sensitive_action=True` in CoPilot
→ verify ReviewRequiredResponse is returned and rendered
- [ ] Manual: Approve in review panel → re-execute the same block →
verify auto-approval kicks in
- [ ] Manual: Verify non-sensitive blocks still execute without review
2026-03-10 18:20:11 +00:00
Otto
5641cdd3ca fix(backend): update test patches for validate_url → validate_url_host rename (#12358)
bfb843a renamed `validate_url` to `validate_url_host` in
`agent_browser`, `run_mcp_tool`, and MCP routes, but the corresponding
test files still patched the old name, causing `AttributeError` in CI.

Updates all mock patch targets and assertions across 3 test files:
- `agent_browser_test.py`
- `test_run_mcp_tool.py`  
- `mcp/test_routes.py`

---
Co-authored-by: Zamil Majdy (@majdyz) <zamil.majdy@agpt.co>
Co-authored-by: Reinier van der Leer (@Pwuts) <pwuts@agpt.co>
2026-03-10 17:22:11 +00:00
Otto
bfb843a56e Merge commit from fork
* Fix SSRF via user-controlled ollama_host field

Validate ollama_host against BLOCKED_IP_NETWORKS before passing to
ollama.AsyncClient(). The server-configured default (env: OLLAMA_HOST)
is allowed without validation; user-supplied values that differ are
checked for private/internal IP resolution.

Fixes GHSA-6jx2-4h7q-3fx3

* Generalize validate_ollama_host to validate_host; fix description line length

* Rename to validate_untrusted_host with whitelist parameter

* Apply PR suggestion: include whitelist in error message; run formatting

* Move whitelist check after URL normalization; match on netloc

* revert unrelated formatting changes

* Dedup validate_url and validate_untrusted_host; normalize whitelist

* Move _resolve_and_check_blocked after calling functions

* dedup and clean up

* make trusted_hostnames truly optional

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2026-03-10 15:51:58 +01:00
Abhimanyu Yadav
684845d946 fix(frontend/builder): handle discriminated unions and improve node layout (#12354)
## Summary
- **Discriminated union support (oneOf)**: Added a new `OneOfField`
component that properly
renders Pydantic discriminated unions. Hides the unusable parent object
handle, auto-populates
the discriminator value, shows a dropdown with variant titles (e.g.,
"Username" / "UserId"), and
filters out the internal discriminator field from the form.
Non-discriminated `oneOf` schemas
  fall back to existing `AnyOfField` behavior.
- **Collapsible object outputs**: Object-type outputs with nested keys
(e.g.,
`PersonLookupResponse.Url`, `PersonLookupResponse.profile`) are now
collapsed by default behind a
caret toggle. Nested keys show short names instead of the full
`Parent.Key` prefix.
- **Node layout cleanup**: Removed excessive bottom margin (`mb-6`) from
`FormRenderer`, hide the
Advanced toggle when no advanced fields exist, and add rounded bottom
corners on OUTPUT-type
  blocks.

<img width="440" height="427" alt="Screenshot 2026-03-10 at 11 31 55 AM"
src="https://github.com/user-attachments/assets/06cc5414-4e02-4371-bdeb-1695e7cb2c97"
/>
<img width="371" height="320" alt="Screenshot 2026-03-10 at 11 36 52 AM"
src="https://github.com/user-attachments/assets/1a55f87a-c602-4f4d-b91b-6e49f810e5d5"
/>

  ## Test plan
- [x] Add a Twitter Get User block — verify "Identifier" shows a
dropdown (Username/UserId) with
no unusable parent handle, discriminator field is hidden, and the block
can run without staying
  INCOMPLETE
- [x] Add any block with object outputs (e.g., PersonLookupResponse) —
verify nested keys are
  collapsed by default and expand on click with short labels
- [x] Verify blocks without advanced fields don't show the Advanced
toggle
- [x] Verify existing `anyOf` schemas (optional types, 3+ variant
unions) still render correctly
  - [x] Check OUTPUT-type blocks have rounded bottom corners

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
Co-authored-by: eureka928 <meobius123@gmail.com>
2026-03-10 14:13:32 +00:00
Bently
6a6b23c2e1 fix(frontend): Remove unused Otto Server Action causing 107K+ errors (#12336)
## Summary

Fixes [OPEN-3025](https://linear.app/autogpt/issue/OPEN-3025) —
**107,571+ Server Action errors** in production

Removes the orphaned `askOtto` Server Action that was left behind after
the Otto chat widget removal in PR #12082.

## Problem

Next.js Server Actions that are never imported are excluded from the
server manifest. Old client bundles still reference the action ID,
causing "not found" errors.

**Sentry impact:**
- **BUILDER-3BN:** 107,571 events
- **BUILDER-729:** 285 events  
- **BUILDER-3QH:** 1,611 events
- **36+ users affected**

## Root Cause

1. **Mar 2025:** Otto widget added to `/build` page with `askOtto`
Server Action
2. **Feb 2026:** Otto widget removed (PR #12082), but `actions.ts` left
behind
3. **Result:** Dead code → not in manifest → errors

## Evidence

```bash
# Zero imports across frontend:
grep -r "askOtto" src/ --exclude="actions.ts"
# → No results

# Server manifest missing the action:
cat .next/server/server-reference-manifest.json
# → Only includes login/supabase actions, NOT build/actions
```

## Changes

-  Delete
`autogpt_platform/frontend/src/app/(platform)/build/actions.ts`

## Testing

1. Verify no imports of `askOtto` in codebase 
2. Check Sentry for error drop after deploy
3. Monitor for new "Server Action not found" errors

## Checklist

- [x] Dead code confirmed (zero imports)
- [x] Sentry issues documented
- [x] Clear commit message with context
2026-03-10 09:03:38 +00:00
Dream
d0a1d72e8a fix(frontend/builder): batch undo history for cascading operations (#12344)
## Summary

Fixes undo in the Builder not working correctly when deleting nodes.
When a node is deleted, React Flow fires `onNodesChange` (node removal)
and `onEdgesChange` (cascading edge cleanup) as separate callbacks —
each independently pushing to the undo history stack. This creates
intermediate states that break undo:

- Single undo restores a partial state (e.g. edges pointing to a deleted
node)
- Multiple undos required to fully restore the graph
- Redo also produces inconsistent states

Resolves #10999

### Changes 🏗️

- **`historyStore.ts`** — Added microtask-based batching to
`pushState()`. Multiple calls within the same synchronous execution
(same event loop tick) are coalesced into a single history entry,
keeping only the first pre-change snapshot. Uses `queueMicrotask` so all
cascading store updates from a single user action settle before the
history entry is committed.
- Reset `pendingState` in `initializeHistory()` and `clear()` to prevent
stale batched state from leaking across graph loads or navigation.

**Side benefit:** Copy/paste operations that add multiple nodes and
edges now also produce a single history entry instead of one per
node/edge.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Place 3 blocks A, B, C and connect A→B→C
  - [x] Delete block C (removes node + cascading edge B→C)
  - [x] Delete connection A→B
  - [x] Undo — connection A→B restored (single undo, not multiple)
  - [x] Undo — block C and connection B→C restored
  - [x] Redo — block C removed again with its connections
- [x] Copy/paste multiple connected blocks — single undo reverts entire
paste

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
Co-authored-by: Abhimanyu Yadav <122007096+Abhi1992002@users.noreply.github.com>
2026-03-10 04:55:07 +00:00
68 changed files with 3292 additions and 538 deletions

View File

@@ -0,0 +1,40 @@
-- =============================================================
-- View: analytics.auth_activities
-- Looker source alias: ds49 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Tracks authentication events (login, logout, SSO, password
-- reset, etc.) from Supabase's internal audit log.
-- Useful for monitoring sign-in patterns and detecting anomalies.
--
-- SOURCE TABLES
-- auth.audit_log_entries — Supabase internal auth event log
--
-- OUTPUT COLUMNS
-- created_at TIMESTAMPTZ When the auth event occurred
-- actor_id TEXT User ID who triggered the event
-- actor_via_sso TEXT Whether the action was via SSO ('true'/'false')
-- action TEXT Event type (e.g. 'login', 'logout', 'token_refreshed')
--
-- WINDOW
-- Rolling 90 days from current date
--
-- EXAMPLE QUERIES
-- -- Daily login counts
-- SELECT DATE_TRUNC('day', created_at) AS day, COUNT(*) AS logins
-- FROM analytics.auth_activities
-- WHERE action = 'login'
-- GROUP BY 1 ORDER BY 1;
--
-- -- SSO vs password login breakdown
-- SELECT actor_via_sso, COUNT(*) FROM analytics.auth_activities
-- WHERE action = 'login' GROUP BY 1;
-- =============================================================
SELECT
created_at,
payload->>'actor_id' AS actor_id,
payload->>'actor_via_sso' AS actor_via_sso,
payload->>'action' AS action
FROM auth.audit_log_entries
WHERE created_at >= NOW() - INTERVAL '90 days'

View File

@@ -0,0 +1,105 @@
-- =============================================================
-- View: analytics.graph_execution
-- Looker source alias: ds16 | Charts: 21
-- =============================================================
-- DESCRIPTION
-- One row per agent graph execution (last 90 days).
-- Unpacks the JSONB stats column into individual numeric columns
-- and normalises the executionStatus — runs that failed due to
-- insufficient credits are reclassified as 'NO_CREDITS' for
-- easier filtering. Error messages are scrubbed of IDs and URLs
-- to allow safe grouping.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
-- platform.AgentGraph — Agent graph metadata (for name)
-- platform.LibraryAgent — To flag possibly-AI (safe-mode) agents
--
-- OUTPUT COLUMNS
-- id TEXT Execution UUID
-- agentGraphId TEXT Agent graph UUID
-- agentGraphVersion INT Graph version number
-- executionStatus TEXT COMPLETED | FAILED | NO_CREDITS | RUNNING | QUEUED | TERMINATED
-- createdAt TIMESTAMPTZ When the execution was queued
-- updatedAt TIMESTAMPTZ Last status update time
-- userId TEXT Owner user UUID
-- agentGraphName TEXT Human-readable agent name
-- cputime DECIMAL Total CPU seconds consumed
-- walltime DECIMAL Total wall-clock seconds
-- node_count DECIMAL Number of nodes in the graph
-- nodes_cputime DECIMAL CPU time across all nodes
-- nodes_walltime DECIMAL Wall time across all nodes
-- execution_cost DECIMAL Credit cost of this execution
-- correctness_score FLOAT AI correctness score (if available)
-- possibly_ai BOOLEAN True if agent has sensitive_action_safe_mode enabled
-- groupedErrorMessage TEXT Scrubbed error string (IDs/URLs replaced with wildcards)
--
-- WINDOW
-- Rolling 90 days (createdAt > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Daily execution counts by status
-- SELECT DATE_TRUNC('day', "createdAt") AS day, "executionStatus", COUNT(*)
-- FROM analytics.graph_execution
-- GROUP BY 1, 2 ORDER BY 1;
--
-- -- Average cost per execution by agent
-- SELECT "agentGraphName", AVG("execution_cost") AS avg_cost, COUNT(*) AS runs
-- FROM analytics.graph_execution
-- WHERE "executionStatus" = 'COMPLETED'
-- GROUP BY 1 ORDER BY avg_cost DESC;
--
-- -- Top error messages
-- SELECT "groupedErrorMessage", COUNT(*) AS occurrences
-- FROM analytics.graph_execution
-- WHERE "executionStatus" = 'FAILED'
-- GROUP BY 1 ORDER BY 2 DESC LIMIT 20;
-- =============================================================
SELECT
ge."id" AS id,
ge."agentGraphId" AS agentGraphId,
ge."agentGraphVersion" AS agentGraphVersion,
CASE
WHEN jsonb_exists(ge."stats"::jsonb, 'error')
AND (
(ge."stats"::jsonb->>'error') ILIKE '%insufficient balance%'
OR (ge."stats"::jsonb->>'error') ILIKE '%you have no credits left%'
)
THEN 'NO_CREDITS'
ELSE CAST(ge."executionStatus" AS TEXT)
END AS executionStatus,
ge."createdAt" AS createdAt,
ge."updatedAt" AS updatedAt,
ge."userId" AS userId,
g."name" AS agentGraphName,
(ge."stats"::jsonb->>'cputime')::decimal AS cputime,
(ge."stats"::jsonb->>'walltime')::decimal AS walltime,
(ge."stats"::jsonb->>'node_count')::decimal AS node_count,
(ge."stats"::jsonb->>'nodes_cputime')::decimal AS nodes_cputime,
(ge."stats"::jsonb->>'nodes_walltime')::decimal AS nodes_walltime,
(ge."stats"::jsonb->>'cost')::decimal AS execution_cost,
(ge."stats"::jsonb->>'correctness_score')::float AS correctness_score,
COALESCE(la.possibly_ai, FALSE) AS possibly_ai,
REGEXP_REPLACE(
REGEXP_REPLACE(
TRIM(BOTH '"' FROM ge."stats"::jsonb->>'error'),
'(https?://)([A-Za-z0-9.-]+)(:[0-9]+)?(/[^\s]*)?',
'\1\2/...', 'gi'
),
'[a-zA-Z0-9_:-]*\d[a-zA-Z0-9_:-]*', '*', 'g'
) AS groupedErrorMessage
FROM platform."AgentGraphExecution" ge
LEFT JOIN platform."AgentGraph" g
ON ge."agentGraphId" = g."id"
AND ge."agentGraphVersion" = g."version"
LEFT JOIN (
SELECT DISTINCT ON ("userId", "agentGraphId")
"userId", "agentGraphId",
("settings"::jsonb->>'sensitive_action_safe_mode')::boolean AS possibly_ai
FROM platform."LibraryAgent"
WHERE "isDeleted" = FALSE
AND "isArchived" = FALSE
ORDER BY "userId", "agentGraphId", "agentGraphVersion" DESC
) la ON la."userId" = ge."userId" AND la."agentGraphId" = ge."agentGraphId"
WHERE ge."createdAt" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -0,0 +1,101 @@
-- =============================================================
-- View: analytics.node_block_execution
-- Looker source alias: ds14 | Charts: 11
-- =============================================================
-- DESCRIPTION
-- One row per node (block) execution (last 90 days).
-- Unpacks stats JSONB and joins to identify which block type
-- was run. For failed nodes, joins the error output and
-- scrubs it for safe grouping.
--
-- SOURCE TABLES
-- platform.AgentNodeExecution — Node execution records
-- platform.AgentNode — Node → block mapping
-- platform.AgentBlock — Block name/ID
-- platform.AgentNodeExecutionInputOutput — Error output values
--
-- OUTPUT COLUMNS
-- id TEXT Node execution UUID
-- agentGraphExecutionId TEXT Parent graph execution UUID
-- agentNodeId TEXT Node UUID within the graph
-- executionStatus TEXT COMPLETED | FAILED | QUEUED | RUNNING | TERMINATED
-- addedTime TIMESTAMPTZ When the node was queued
-- queuedTime TIMESTAMPTZ When it entered the queue
-- startedTime TIMESTAMPTZ When execution started
-- endedTime TIMESTAMPTZ When execution finished
-- inputSize BIGINT Input payload size in bytes
-- outputSize BIGINT Output payload size in bytes
-- walltime NUMERIC Wall-clock seconds for this node
-- cputime NUMERIC CPU seconds for this node
-- llmRetryCount INT Number of LLM retries
-- llmCallCount INT Number of LLM API calls made
-- inputTokenCount BIGINT LLM input tokens consumed
-- outputTokenCount BIGINT LLM output tokens produced
-- blockName TEXT Human-readable block name (e.g. 'OpenAIBlock')
-- blockId TEXT Block UUID
-- groupedErrorMessage TEXT Scrubbed error (IDs/URLs wildcarded)
-- errorMessage TEXT Raw error output (only set when FAILED)
--
-- WINDOW
-- Rolling 90 days (addedTime > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Most-used blocks by execution count
-- SELECT "blockName", COUNT(*) AS executions,
-- COUNT(*) FILTER (WHERE "executionStatus"='FAILED') AS failures
-- FROM analytics.node_block_execution
-- GROUP BY 1 ORDER BY executions DESC LIMIT 20;
--
-- -- Average LLM token usage per block
-- SELECT "blockName",
-- AVG("inputTokenCount") AS avg_input_tokens,
-- AVG("outputTokenCount") AS avg_output_tokens
-- FROM analytics.node_block_execution
-- WHERE "llmCallCount" > 0
-- GROUP BY 1 ORDER BY avg_input_tokens DESC;
--
-- -- Top failure reasons
-- SELECT "blockName", "groupedErrorMessage", COUNT(*) AS count
-- FROM analytics.node_block_execution
-- WHERE "executionStatus" = 'FAILED'
-- GROUP BY 1, 2 ORDER BY count DESC LIMIT 20;
-- =============================================================
SELECT
ne."id" AS id,
ne."agentGraphExecutionId" AS agentGraphExecutionId,
ne."agentNodeId" AS agentNodeId,
CAST(ne."executionStatus" AS TEXT) AS executionStatus,
ne."addedTime" AS addedTime,
ne."queuedTime" AS queuedTime,
ne."startedTime" AS startedTime,
ne."endedTime" AS endedTime,
(ne."stats"::jsonb->>'input_size')::bigint AS inputSize,
(ne."stats"::jsonb->>'output_size')::bigint AS outputSize,
(ne."stats"::jsonb->>'walltime')::numeric AS walltime,
(ne."stats"::jsonb->>'cputime')::numeric AS cputime,
(ne."stats"::jsonb->>'llm_retry_count')::int AS llmRetryCount,
(ne."stats"::jsonb->>'llm_call_count')::int AS llmCallCount,
(ne."stats"::jsonb->>'input_token_count')::bigint AS inputTokenCount,
(ne."stats"::jsonb->>'output_token_count')::bigint AS outputTokenCount,
b."name" AS blockName,
b."id" AS blockId,
REGEXP_REPLACE(
REGEXP_REPLACE(
TRIM(BOTH '"' FROM eio."data"::text),
'(https?://)([A-Za-z0-9.-]+)(:[0-9]+)?(/[^\s]*)?',
'\1\2/...', 'gi'
),
'[a-zA-Z0-9_:-]*\d[a-zA-Z0-9_:-]*', '*', 'g'
) AS groupedErrorMessage,
eio."data" AS errorMessage
FROM platform."AgentNodeExecution" ne
LEFT JOIN platform."AgentNode" nd
ON ne."agentNodeId" = nd."id"
LEFT JOIN platform."AgentBlock" b
ON nd."agentBlockId" = b."id"
LEFT JOIN platform."AgentNodeExecutionInputOutput" eio
ON eio."referencedByOutputExecId" = ne."id"
AND eio."name" = 'error'
AND ne."executionStatus" = 'FAILED'
WHERE ne."addedTime" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -0,0 +1,97 @@
-- =============================================================
-- View: analytics.retention_agent
-- Looker source alias: ds35 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention broken down per individual agent.
-- Cohort = week of a user's first use of THAT specific agent.
-- Tells you which agents keep users coming back vs. one-shot
-- use. Only includes cohorts from the last 180 days.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records (user × agent × time)
-- platform.AgentGraph — Agent names
--
-- OUTPUT COLUMNS
-- agent_id TEXT Agent graph UUID
-- agent_label TEXT 'AgentName [first8chars]'
-- agent_label_n TEXT 'AgentName [first8chars] (n=total_users)'
-- cohort_week_start DATE Week users first ran this agent
-- cohort_label TEXT ISO week label
-- cohort_label_n TEXT ISO week label with cohort size
-- user_lifetime_week INT Weeks since first use of this agent
-- cohort_users BIGINT Users in this cohort for this agent
-- active_users BIGINT Users who ran the agent again in week k
-- retention_rate FLOAT active_users / cohort_users
-- cohort_users_w0 BIGINT cohort_users only at week 0 (safe to SUM)
-- agent_total_users BIGINT Total users across all cohorts for this agent
--
-- EXAMPLE QUERIES
-- -- Best-retained agents at week 2
-- SELECT agent_label, AVG(retention_rate) AS w2_retention
-- FROM analytics.retention_agent
-- WHERE user_lifetime_week = 2 AND cohort_users >= 10
-- GROUP BY 1 ORDER BY w2_retention DESC LIMIT 10;
--
-- -- Agents with most unique users
-- SELECT DISTINCT agent_label, agent_total_users
-- FROM analytics.retention_agent
-- ORDER BY agent_total_users DESC LIMIT 20;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, (CURRENT_DATE - INTERVAL '180 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."agentGraphId" AS agent_id,
e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('week', e."createdAt")::date AS week_start
FROM platform."AgentGraphExecution" e
),
first_use AS (
SELECT user_id, agent_id, MIN(created_at) AS first_use_at,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1,2
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_weeks AS (SELECT DISTINCT user_id, agent_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, aw.agent_id, fu.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fu.first_use_at)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_use fu USING (user_id, agent_id)
WHERE aw.week_start >= DATE_TRUNC('week',fu.first_use_at)::date
),
active_counts AS (
SELECT agent_id, cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2,3
),
cohort_sizes AS (
SELECT agent_id, cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_use GROUP BY 1,2
),
cohort_caps AS (
SELECT cs.agent_id, cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.agent_id, cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
),
agent_names AS (SELECT DISTINCT ON (g."id") g."id" AS agent_id, g."name" AS agent_name FROM platform."AgentGraph" g ORDER BY g."id", g."version" DESC),
agent_total_users AS (SELECT agent_id, SUM(cohort_users) AS agent_total_users FROM cohort_sizes GROUP BY 1)
SELECT
g.agent_id,
COALESCE(an.agent_name,'(unnamed)')||' ['||LEFT(g.agent_id::text,8)||']' AS agent_label,
COALESCE(an.agent_name,'(unnamed)')||' ['||LEFT(g.agent_id::text,8)||'] (n='||COALESCE(atu.agent_total_users,0)||')' AS agent_label_n,
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(ac.active_users,0) AS active_users,
COALESCE(ac.active_users,0)::float / NULLIF(g.cohort_users,0) AS retention_rate,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0,
COALESCE(atu.agent_total_users,0) AS agent_total_users
FROM grid g
LEFT JOIN active_counts ac ON ac.agent_id=g.agent_id AND ac.cohort_week_start=g.cohort_week_start AND ac.user_lifetime_week=g.user_lifetime_week
LEFT JOIN agent_names an ON an.agent_id=g.agent_id
LEFT JOIN agent_total_users atu ON atu.agent_id=g.agent_id
ORDER BY agent_label, g.cohort_week_start, g.user_lifetime_week;

View File

@@ -0,0 +1,81 @@
-- =============================================================
-- View: analytics.retention_execution_daily
-- Looker source alias: ds111 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Daily cohort retention based on agent executions.
-- Cohort anchor = day of user's FIRST ever execution.
-- Only includes cohorts from the last 90 days, up to day 30.
-- Great for early engagement analysis (did users run another
-- agent the next day?).
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
--
-- OUTPUT COLUMNS
-- Same pattern as retention_login_daily.
-- cohort_day_start = day of first execution (not first login)
--
-- EXAMPLE QUERIES
-- -- Day-3 execution retention
-- SELECT cohort_label, retention_rate_bounded AS d3_retention
-- FROM analytics.retention_execution_daily
-- WHERE user_lifetime_day = 3 ORDER BY cohort_day_start;
-- =============================================================
WITH params AS (SELECT 30::int AS max_days, (CURRENT_DATE - INTERVAL '90 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('day', e."createdAt")::date AS day_start
FROM platform."AgentGraphExecution" e WHERE e."userId" IS NOT NULL
),
first_exec AS (
SELECT user_id, MIN(created_at) AS first_exec_at,
DATE_TRUNC('day', MIN(created_at))::date AS cohort_day_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_days AS (SELECT DISTINCT user_id, day_start FROM events),
user_day_age AS (
SELECT ad.user_id, fe.cohort_day_start,
(ad.day_start - DATE_TRUNC('day',fe.first_exec_at)::date)::int AS user_lifetime_day
FROM activity_days ad JOIN first_exec fe USING (user_id)
WHERE ad.day_start >= DATE_TRUNC('day',fe.first_exec_at)::date
),
bounded_counts AS (
SELECT cohort_day_start, user_lifetime_day, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_day_age WHERE user_lifetime_day >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_day_start, user_id, MAX(user_lifetime_day) AS last_active_day FROM user_day_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_day_start, gs AS user_lifetime_day, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_day,(SELECT max_days FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_day_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_exec GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_day_start, cs.cohort_users,
LEAST((SELECT max_days FROM params), GREATEST(0,(CURRENT_DATE-cs.cohort_day_start)::int)) AS cap_days
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_day_start, gs AS user_lifetime_day, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_days) gs
)
SELECT
g.cohort_day_start,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD') AS cohort_label,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_day, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_day=0 THEN g.cohort_users ELSE 0 END AS cohort_users_d0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_day_start=g.cohort_day_start AND b.user_lifetime_day=g.user_lifetime_day
LEFT JOIN unbounded_counts u ON u.cohort_day_start=g.cohort_day_start AND u.user_lifetime_day=g.user_lifetime_day
ORDER BY g.cohort_day_start, g.user_lifetime_day;

View File

@@ -0,0 +1,81 @@
-- =============================================================
-- View: analytics.retention_execution_weekly
-- Looker source alias: ds92 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention based on agent executions.
-- Cohort anchor = week of user's FIRST ever agent execution
-- (not first login). Only includes cohorts from the last 180 days.
-- Useful when you care about product engagement, not just visits.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
--
-- OUTPUT COLUMNS
-- Same pattern as retention_login_weekly.
-- cohort_week_start = week of first execution (not first login)
--
-- EXAMPLE QUERIES
-- -- Week-2 execution retention
-- SELECT cohort_label, retention_rate_bounded
-- FROM analytics.retention_execution_weekly
-- WHERE user_lifetime_week = 2 ORDER BY cohort_week_start;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, (CURRENT_DATE - INTERVAL '180 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('week', e."createdAt")::date AS week_start
FROM platform."AgentGraphExecution" e WHERE e."userId" IS NOT NULL
),
first_exec AS (
SELECT user_id, MIN(created_at) AS first_exec_at,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fe.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fe.first_exec_at)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_exec fe USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week',fe.first_exec_at)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_exec GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week;

View File

@@ -0,0 +1,94 @@
-- =============================================================
-- View: analytics.retention_login_daily
-- Looker source alias: ds112 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Daily cohort retention based on login sessions.
-- Same logic as retention_login_weekly but at day granularity,
-- showing up to day 30 for cohorts from the last 90 days.
-- Useful for analysing early activation (days 1-7) in detail.
--
-- SOURCE TABLES
-- auth.sessions — Login session records
--
-- OUTPUT COLUMNS (same pattern as retention_login_weekly)
-- cohort_day_start DATE First day the cohort logged in
-- cohort_label TEXT Date string (e.g. '2025-03-01')
-- cohort_label_n TEXT Date + cohort size (e.g. '2025-03-01 (n=12)')
-- user_lifetime_day INT Days since first login (0 = signup day)
-- cohort_users BIGINT Total users in cohort
-- active_users_bounded BIGINT Users active on exactly day k
-- retained_users_unbounded BIGINT Users active any time on/after day k
-- retention_rate_bounded FLOAT bounded / cohort_users
-- retention_rate_unbounded FLOAT unbounded / cohort_users
-- cohort_users_d0 BIGINT cohort_users only at day 0, else 0 (safe to SUM)
--
-- EXAMPLE QUERIES
-- -- Day-1 retention rate (came back next day)
-- SELECT cohort_label, retention_rate_bounded AS d1_retention
-- FROM analytics.retention_login_daily
-- WHERE user_lifetime_day = 1 ORDER BY cohort_day_start;
--
-- -- Average retention curve across all cohorts
-- SELECT user_lifetime_day,
-- SUM(active_users_bounded)::float / NULLIF(SUM(cohort_users_d0), 0) AS avg_retention
-- FROM analytics.retention_login_daily
-- GROUP BY 1 ORDER BY 1;
-- =============================================================
WITH params AS (SELECT 30::int AS max_days, (CURRENT_DATE - INTERVAL '90 days')::date AS cohort_start),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('day', s.created_at)::date AS day_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('day', MIN(created_at))::date AS cohort_day_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_days AS (SELECT DISTINCT user_id, day_start FROM events),
user_day_age AS (
SELECT ad.user_id, fl.cohort_day_start,
(ad.day_start - DATE_TRUNC('day', fl.first_login_time)::date)::int AS user_lifetime_day
FROM activity_days ad JOIN first_login fl USING (user_id)
WHERE ad.day_start >= DATE_TRUNC('day', fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_day_start, user_lifetime_day, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_day_age WHERE user_lifetime_day >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_day_start, user_id, MAX(user_lifetime_day) AS last_active_day FROM user_day_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_day_start, gs AS user_lifetime_day, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_day,(SELECT max_days FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_day_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_day_start, cs.cohort_users,
LEAST((SELECT max_days FROM params), GREATEST(0,(CURRENT_DATE-cs.cohort_day_start)::int)) AS cap_days
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_day_start, gs AS user_lifetime_day, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_days) gs
)
SELECT
g.cohort_day_start,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD') AS cohort_label,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_day, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_day=0 THEN g.cohort_users ELSE 0 END AS cohort_users_d0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_day_start=g.cohort_day_start AND b.user_lifetime_day=g.user_lifetime_day
LEFT JOIN unbounded_counts u ON u.cohort_day_start=g.cohort_day_start AND u.user_lifetime_day=g.user_lifetime_day
ORDER BY g.cohort_day_start, g.user_lifetime_day;

View File

@@ -0,0 +1,96 @@
-- =============================================================
-- View: analytics.retention_login_onboarded_weekly
-- Looker source alias: ds101 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention from login sessions, restricted to
-- users who "onboarded" — defined as running at least one
-- agent within 365 days of their first login.
-- Filters out users who signed up but never activated,
-- giving a cleaner view of engaged-user retention.
--
-- SOURCE TABLES
-- auth.sessions — Login session records
-- platform.AgentGraphExecution — Used to identify onboarders
--
-- OUTPUT COLUMNS
-- Same as retention_login_weekly (cohort_week_start, user_lifetime_week,
-- retention_rate_bounded, retention_rate_unbounded, etc.)
-- Only difference: cohort is filtered to onboarded users only.
--
-- EXAMPLE QUERIES
-- -- Compare week-4 retention: all users vs onboarded only
-- SELECT 'all_users' AS segment, AVG(retention_rate_bounded) AS w4_retention
-- FROM analytics.retention_login_weekly WHERE user_lifetime_week = 4
-- UNION ALL
-- SELECT 'onboarded', AVG(retention_rate_bounded)
-- FROM analytics.retention_login_onboarded_weekly WHERE user_lifetime_week = 4;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, 365::int AS onboarding_window_days),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('week', s.created_at)::date AS week_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login_all AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
),
onboarders AS (
SELECT fl.user_id FROM first_login_all fl
WHERE EXISTS (
SELECT 1 FROM platform."AgentGraphExecution" e
WHERE e."userId"::text = fl.user_id
AND e."createdAt" >= fl.first_login_time
AND e."createdAt" < fl.first_login_time
+ make_interval(days => (SELECT onboarding_window_days FROM params))
)
),
first_login AS (SELECT * FROM first_login_all WHERE user_id IN (SELECT user_id FROM onboarders)),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fl.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fl.first_login_time)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_login fl USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week',fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week;

View File

@@ -0,0 +1,103 @@
-- =============================================================
-- View: analytics.retention_login_weekly
-- Looker source alias: ds83 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention based on login sessions.
-- Users are grouped by the ISO week of their first ever login.
-- For each cohort × lifetime-week combination, outputs both:
-- - bounded rate: % active in exactly that week
-- - unbounded rate: % who were ever active on or after that week
-- Weeks are capped to the cohort's actual age (no future data points).
--
-- SOURCE TABLES
-- auth.sessions — Login session records
--
-- HOW TO READ THE OUTPUT
-- cohort_week_start The Monday of the week users first logged in
-- user_lifetime_week 0 = signup week, 1 = one week later, etc.
-- retention_rate_bounded = active_users_bounded / cohort_users
-- retention_rate_unbounded = retained_users_unbounded / cohort_users
--
-- OUTPUT COLUMNS
-- cohort_week_start DATE First day of the cohort's signup week
-- cohort_label TEXT ISO week label (e.g. '2025-W01')
-- cohort_label_n TEXT ISO week label with cohort size (e.g. '2025-W01 (n=42)')
-- user_lifetime_week INT Weeks since first login (0 = signup week)
-- cohort_users BIGINT Total users in this cohort (denominator)
-- active_users_bounded BIGINT Users active in exactly week k
-- retained_users_unbounded BIGINT Users active any time on/after week k
-- retention_rate_bounded FLOAT bounded active / cohort_users
-- retention_rate_unbounded FLOAT unbounded retained / cohort_users
-- cohort_users_w0 BIGINT cohort_users only at week 0, else 0 (safe to SUM in pivot tables)
--
-- EXAMPLE QUERIES
-- -- Week-1 retention rate per cohort
-- SELECT cohort_label, retention_rate_bounded AS w1_retention
-- FROM analytics.retention_login_weekly
-- WHERE user_lifetime_week = 1
-- ORDER BY cohort_week_start;
--
-- -- Overall average retention curve (all cohorts combined)
-- SELECT user_lifetime_week,
-- SUM(active_users_bounded)::float / NULLIF(SUM(cohort_users_w0), 0) AS avg_retention
-- FROM analytics.retention_login_weekly
-- GROUP BY 1 ORDER BY 1;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('week', s.created_at)::date AS week_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fl.cohort_week_start,
((aw.week_start - DATE_TRUNC('week', fl.first_login_time)::date) / 7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_login fl USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week', fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date - cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week

View File

@@ -0,0 +1,71 @@
-- =============================================================
-- View: analytics.user_block_spending
-- Looker source alias: ds6 | Charts: 5
-- =============================================================
-- DESCRIPTION
-- One row per credit transaction (last 90 days).
-- Shows how users spend credits broken down by block type,
-- LLM provider and model. Joins node execution stats for
-- token-level detail.
--
-- SOURCE TABLES
-- platform.CreditTransaction — Credit debit/credit records
-- platform.AgentNodeExecution — Node execution stats (for token counts)
--
-- OUTPUT COLUMNS
-- transactionKey TEXT Unique transaction identifier
-- userId TEXT User who was charged
-- amount DECIMAL Credit amount (positive = credit, negative = debit)
-- negativeAmount DECIMAL amount * -1 (convenience for spend charts)
-- transactionType TEXT Transaction type (e.g. 'USAGE', 'REFUND', 'TOP_UP')
-- transactionTime TIMESTAMPTZ When the transaction was recorded
-- blockId TEXT Block UUID that triggered the spend
-- blockName TEXT Human-readable block name
-- llm_provider TEXT LLM provider (e.g. 'openai', 'anthropic')
-- llm_model TEXT Model name (e.g. 'gpt-4o', 'claude-3-5-sonnet')
-- node_exec_id TEXT Linked node execution UUID
-- llm_call_count INT LLM API calls made in that execution
-- llm_retry_count INT LLM retries in that execution
-- llm_input_token_count INT Input tokens consumed
-- llm_output_token_count INT Output tokens produced
--
-- WINDOW
-- Rolling 90 days (createdAt > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Total spend per user (last 90 days)
-- SELECT "userId", SUM("negativeAmount") AS total_spent
-- FROM analytics.user_block_spending
-- WHERE "transactionType" = 'USAGE'
-- GROUP BY 1 ORDER BY total_spent DESC;
--
-- -- Spend by LLM provider + model
-- SELECT "llm_provider", "llm_model",
-- SUM("negativeAmount") AS total_cost,
-- SUM("llm_input_token_count") AS input_tokens,
-- SUM("llm_output_token_count") AS output_tokens
-- FROM analytics.user_block_spending
-- WHERE "llm_provider" IS NOT NULL
-- GROUP BY 1, 2 ORDER BY total_cost DESC;
-- =============================================================
SELECT
c."transactionKey" AS transactionKey,
c."userId" AS userId,
c."amount" AS amount,
c."amount" * -1 AS negativeAmount,
c."type" AS transactionType,
c."createdAt" AS transactionTime,
c.metadata->>'block_id' AS blockId,
c.metadata->>'block' AS blockName,
c.metadata->'input'->'credentials'->>'provider' AS llm_provider,
c.metadata->'input'->>'model' AS llm_model,
c.metadata->>'node_exec_id' AS node_exec_id,
(ne."stats"->>'llm_call_count')::int AS llm_call_count,
(ne."stats"->>'llm_retry_count')::int AS llm_retry_count,
(ne."stats"->>'input_token_count')::int AS llm_input_token_count,
(ne."stats"->>'output_token_count')::int AS llm_output_token_count
FROM platform."CreditTransaction" c
LEFT JOIN platform."AgentNodeExecution" ne
ON (c.metadata->>'node_exec_id') = ne."id"::text
WHERE c."createdAt" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -0,0 +1,45 @@
-- =============================================================
-- View: analytics.user_onboarding
-- Looker source alias: ds68 | Charts: 3
-- =============================================================
-- DESCRIPTION
-- One row per user onboarding record. Contains the user's
-- stated usage reason, selected integrations, completed
-- onboarding steps and optional first agent selection.
-- Full history (no date filter) since onboarding happens
-- once per user.
--
-- SOURCE TABLES
-- platform.UserOnboarding — Onboarding state per user
--
-- OUTPUT COLUMNS
-- id TEXT Onboarding record UUID
-- createdAt TIMESTAMPTZ When onboarding started
-- updatedAt TIMESTAMPTZ Last update to onboarding state
-- usageReason TEXT Why user signed up (e.g. 'work', 'personal')
-- integrations TEXT[] Array of integration names the user selected
-- userId TEXT User UUID
-- completedSteps TEXT[] Array of onboarding step enums completed
-- selectedStoreListingVersionId TEXT First marketplace agent the user chose (if any)
--
-- EXAMPLE QUERIES
-- -- Usage reason breakdown
-- SELECT "usageReason", COUNT(*) FROM analytics.user_onboarding GROUP BY 1;
--
-- -- Completion rate per step
-- SELECT step, COUNT(*) AS users_completed
-- FROM analytics.user_onboarding
-- CROSS JOIN LATERAL UNNEST("completedSteps") AS step
-- GROUP BY 1 ORDER BY users_completed DESC;
-- =============================================================
SELECT
id,
"createdAt",
"updatedAt",
"usageReason",
integrations,
"userId",
"completedSteps",
"selectedStoreListingVersionId"
FROM platform."UserOnboarding"

View File

@@ -0,0 +1,100 @@
-- =============================================================
-- View: analytics.user_onboarding_funnel
-- Looker source alias: ds74 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Pre-aggregated onboarding funnel showing how many users
-- completed each step and the drop-off percentage from the
-- previous step. One row per onboarding step (all 22 steps
-- always present, even with 0 completions — prevents sparse
-- gaps from making LAG compare the wrong predecessors).
--
-- SOURCE TABLES
-- platform.UserOnboarding — Onboarding records with completedSteps array
--
-- OUTPUT COLUMNS
-- step TEXT Onboarding step enum name (e.g. 'WELCOME', 'CONGRATS')
-- step_order INT Numeric position in the funnel (1=first, 22=last)
-- users_completed BIGINT Distinct users who completed this step
-- pct_from_prev NUMERIC % of users from the previous step who reached this one
--
-- STEP ORDER
-- 1 WELCOME 9 MARKETPLACE_VISIT 17 SCHEDULE_AGENT
-- 2 USAGE_REASON 10 MARKETPLACE_ADD_AGENT 18 RUN_AGENTS
-- 3 INTEGRATIONS 11 MARKETPLACE_RUN_AGENT 19 RUN_3_DAYS
-- 4 AGENT_CHOICE 12 BUILDER_OPEN 20 TRIGGER_WEBHOOK
-- 5 AGENT_NEW_RUN 13 BUILDER_SAVE_AGENT 21 RUN_14_DAYS
-- 6 AGENT_INPUT 14 BUILDER_RUN_AGENT 22 RUN_AGENTS_100
-- 7 CONGRATS 15 VISIT_COPILOT
-- 8 GET_RESULTS 16 RE_RUN_AGENT
--
-- WINDOW
-- Users who started onboarding in the last 90 days
--
-- EXAMPLE QUERIES
-- -- Full funnel
-- SELECT * FROM analytics.user_onboarding_funnel ORDER BY step_order;
--
-- -- Biggest drop-off point
-- SELECT step, pct_from_prev FROM analytics.user_onboarding_funnel
-- ORDER BY pct_from_prev ASC LIMIT 3;
-- =============================================================
WITH all_steps AS (
-- Complete ordered grid of all 22 steps so zero-completion steps
-- are always present, keeping LAG comparisons correct.
SELECT step_name, step_order
FROM (VALUES
('WELCOME', 1),
('USAGE_REASON', 2),
('INTEGRATIONS', 3),
('AGENT_CHOICE', 4),
('AGENT_NEW_RUN', 5),
('AGENT_INPUT', 6),
('CONGRATS', 7),
('GET_RESULTS', 8),
('MARKETPLACE_VISIT', 9),
('MARKETPLACE_ADD_AGENT', 10),
('MARKETPLACE_RUN_AGENT', 11),
('BUILDER_OPEN', 12),
('BUILDER_SAVE_AGENT', 13),
('BUILDER_RUN_AGENT', 14),
('VISIT_COPILOT', 15),
('RE_RUN_AGENT', 16),
('SCHEDULE_AGENT', 17),
('RUN_AGENTS', 18),
('RUN_3_DAYS', 19),
('TRIGGER_WEBHOOK', 20),
('RUN_14_DAYS', 21),
('RUN_AGENTS_100', 22)
) AS t(step_name, step_order)
),
raw AS (
SELECT
u."userId",
step_txt::text AS step
FROM platform."UserOnboarding" u
CROSS JOIN LATERAL UNNEST(u."completedSteps") AS step_txt
WHERE u."createdAt" >= CURRENT_DATE - INTERVAL '90 days'
),
step_counts AS (
SELECT step, COUNT(DISTINCT "userId") AS users_completed
FROM raw GROUP BY step
),
funnel AS (
SELECT
a.step_name AS step,
a.step_order,
COALESCE(sc.users_completed, 0) AS users_completed,
ROUND(
100.0 * COALESCE(sc.users_completed, 0)
/ NULLIF(
LAG(COALESCE(sc.users_completed, 0)) OVER (ORDER BY a.step_order),
0
),
2
) AS pct_from_prev
FROM all_steps a
LEFT JOIN step_counts sc ON sc.step = a.step_name
)
SELECT * FROM funnel ORDER BY step_order

View File

@@ -0,0 +1,41 @@
-- =============================================================
-- View: analytics.user_onboarding_integration
-- Looker source alias: ds75 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Pre-aggregated count of users who selected each integration
-- during onboarding. One row per integration type, sorted
-- by popularity.
--
-- SOURCE TABLES
-- platform.UserOnboarding — integrations array column
--
-- OUTPUT COLUMNS
-- integration TEXT Integration name (e.g. 'github', 'slack', 'notion')
-- users_with_integration BIGINT Distinct users who selected this integration
--
-- WINDOW
-- Users who started onboarding in the last 90 days
--
-- EXAMPLE QUERIES
-- -- Full integration popularity ranking
-- SELECT * FROM analytics.user_onboarding_integration;
--
-- -- Top 5 integrations
-- SELECT * FROM analytics.user_onboarding_integration LIMIT 5;
-- =============================================================
WITH exploded AS (
SELECT
u."userId" AS user_id,
UNNEST(u."integrations") AS integration
FROM platform."UserOnboarding" u
WHERE u."createdAt" >= CURRENT_DATE - INTERVAL '90 days'
)
SELECT
integration,
COUNT(DISTINCT user_id) AS users_with_integration
FROM exploded
WHERE integration IS NOT NULL AND integration <> ''
GROUP BY integration
ORDER BY users_with_integration DESC

View File

@@ -0,0 +1,145 @@
-- =============================================================
-- View: analytics.users_activities
-- Looker source alias: ds56 | Charts: 5
-- =============================================================
-- DESCRIPTION
-- One row per user with lifetime activity summary.
-- Joins login sessions with agent graphs, executions and
-- node-level runs to give a full picture of how engaged
-- each user is. Includes a convenience flag for 7-day
-- activation (did the user return at least 7 days after
-- their first login?).
--
-- SOURCE TABLES
-- auth.sessions — Login/session records
-- platform.AgentGraph — Graphs (agents) built by the user
-- platform.AgentGraphExecution — Agent run history
-- platform.AgentNodeExecution — Individual block execution history
--
-- PERFORMANCE NOTE
-- Each CTE aggregates its own table independently by userId.
-- This avoids the fan-out that occurs when driving every join
-- from user_logins across the two largest tables
-- (AgentGraphExecution and AgentNodeExecution).
--
-- OUTPUT COLUMNS
-- user_id TEXT Supabase user UUID
-- first_login_time TIMESTAMPTZ First ever session created_at
-- last_login_time TIMESTAMPTZ Most recent session created_at
-- last_visit_time TIMESTAMPTZ Max of last refresh or login
-- last_agent_save_time TIMESTAMPTZ Last time user saved an agent graph
-- agent_count BIGINT Number of distinct active graphs built (0 if none)
-- first_agent_run_time TIMESTAMPTZ First ever graph execution
-- last_agent_run_time TIMESTAMPTZ Most recent graph execution
-- unique_agent_runs BIGINT Distinct agent graphs ever run (0 if none)
-- agent_runs BIGINT Total graph execution count (0 if none)
-- node_execution_count BIGINT Total node executions across all runs
-- node_execution_failed BIGINT Node executions with FAILED status
-- node_execution_completed BIGINT Node executions with COMPLETED status
-- node_execution_terminated BIGINT Node executions with TERMINATED status
-- node_execution_queued BIGINT Node executions with QUEUED status
-- node_execution_running BIGINT Node executions with RUNNING status
-- is_active_after_7d INT 1=returned after day 7, 0=did not, NULL=too early to tell
-- node_execution_incomplete BIGINT Node executions with INCOMPLETE status
-- node_execution_review BIGINT Node executions with REVIEW status
--
-- EXAMPLE QUERIES
-- -- Users who ran at least one agent and returned after 7 days
-- SELECT COUNT(*) FROM analytics.users_activities
-- WHERE agent_runs > 0 AND is_active_after_7d = 1;
--
-- -- Top 10 most active users by agent runs
-- SELECT user_id, agent_runs, node_execution_count
-- FROM analytics.users_activities
-- ORDER BY agent_runs DESC LIMIT 10;
--
-- -- 7-day activation rate
-- SELECT
-- SUM(CASE WHEN is_active_after_7d = 1 THEN 1 ELSE 0 END)::float
-- / NULLIF(COUNT(CASE WHEN is_active_after_7d IS NOT NULL THEN 1 END), 0)
-- AS activation_rate
-- FROM analytics.users_activities;
-- =============================================================
WITH user_logins AS (
SELECT
user_id::text AS user_id,
MIN(created_at) AS first_login_time,
MAX(created_at) AS last_login_time,
GREATEST(
MAX(refreshed_at)::timestamptz,
MAX(created_at)::timestamptz
) AS last_visit_time
FROM auth.sessions
GROUP BY user_id
),
user_agents AS (
-- Aggregate AgentGraph directly by userId (no fan-out from user_logins)
SELECT
"userId"::text AS user_id,
MAX("updatedAt") AS last_agent_save_time,
COUNT(DISTINCT "id") AS agent_count
FROM platform."AgentGraph"
WHERE "isActive"
GROUP BY "userId"
),
user_graph_runs AS (
-- Aggregate AgentGraphExecution directly by userId
SELECT
"userId"::text AS user_id,
MIN("createdAt") AS first_agent_run_time,
MAX("createdAt") AS last_agent_run_time,
COUNT(DISTINCT "agentGraphId") AS unique_agent_runs,
COUNT("id") AS agent_runs
FROM platform."AgentGraphExecution"
GROUP BY "userId"
),
user_node_runs AS (
-- Aggregate AgentNodeExecution directly; resolve userId via a
-- single join to AgentGraphExecution instead of fanning out from
-- user_logins through both large tables.
SELECT
g."userId"::text AS user_id,
COUNT(*) AS node_execution_count,
COUNT(*) FILTER (WHERE n."executionStatus" = 'FAILED') AS node_execution_failed,
COUNT(*) FILTER (WHERE n."executionStatus" = 'COMPLETED') AS node_execution_completed,
COUNT(*) FILTER (WHERE n."executionStatus" = 'TERMINATED') AS node_execution_terminated,
COUNT(*) FILTER (WHERE n."executionStatus" = 'QUEUED') AS node_execution_queued,
COUNT(*) FILTER (WHERE n."executionStatus" = 'RUNNING') AS node_execution_running,
COUNT(*) FILTER (WHERE n."executionStatus" = 'INCOMPLETE') AS node_execution_incomplete,
COUNT(*) FILTER (WHERE n."executionStatus" = 'REVIEW') AS node_execution_review
FROM platform."AgentNodeExecution" n
JOIN platform."AgentGraphExecution" g
ON g."id" = n."agentGraphExecutionId"
GROUP BY g."userId"
)
SELECT
ul.user_id,
ul.first_login_time,
ul.last_login_time,
ul.last_visit_time,
ua.last_agent_save_time,
COALESCE(ua.agent_count, 0) AS agent_count,
gr.first_agent_run_time,
gr.last_agent_run_time,
COALESCE(gr.unique_agent_runs, 0) AS unique_agent_runs,
COALESCE(gr.agent_runs, 0) AS agent_runs,
COALESCE(nr.node_execution_count, 0) AS node_execution_count,
COALESCE(nr.node_execution_failed, 0) AS node_execution_failed,
COALESCE(nr.node_execution_completed, 0) AS node_execution_completed,
COALESCE(nr.node_execution_terminated, 0) AS node_execution_terminated,
COALESCE(nr.node_execution_queued, 0) AS node_execution_queued,
COALESCE(nr.node_execution_running, 0) AS node_execution_running,
CASE
WHEN ul.first_login_time < NOW() - INTERVAL '7 days'
AND ul.last_visit_time >= ul.first_login_time + INTERVAL '7 days' THEN 1
WHEN ul.first_login_time < NOW() - INTERVAL '7 days'
AND ul.last_visit_time < ul.first_login_time + INTERVAL '7 days' THEN 0
ELSE NULL
END AS is_active_after_7d,
COALESCE(nr.node_execution_incomplete, 0) AS node_execution_incomplete,
COALESCE(nr.node_execution_review, 0) AS node_execution_review
FROM user_logins ul
LEFT JOIN user_agents ua ON ul.user_id = ua.user_id
LEFT JOIN user_graph_runs gr ON ul.user_id = gr.user_id
LEFT JOIN user_node_runs nr ON ul.user_id = nr.user_id

View File

@@ -638,7 +638,7 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
# Mock get_node_executions to return node_id mapping
mock_get_node_executions = mocker.patch(
"backend.data.execution.get_node_executions"
"backend.api.features.executions.review.routes.get_node_executions"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_exec_id = "test_node_123"
@@ -936,7 +936,7 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
# Mock get_node_executions to return node_id mapping
mock_get_node_executions = mocker.patch(
"backend.data.execution.get_node_executions"
"backend.api.features.executions.review.routes.get_node_executions"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_exec_id = "node_exec_approved"
@@ -1148,7 +1148,7 @@ async def test_process_review_action_per_review_auto_approve_granularity(
# Mock get_node_executions to return batch node data
mock_get_node_executions = mocker.patch(
"backend.data.execution.get_node_executions"
"backend.api.features.executions.review.routes.get_node_executions"
)
# Create mock node executions for each review
mock_node_execs = []

View File

@@ -6,10 +6,15 @@ import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, HTTPException, Query, Security, status
from prisma.enums import ReviewStatus
from backend.copilot.constants import (
is_copilot_synthetic_id,
parse_node_id_from_exec_id,
)
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
get_graph_execution_meta,
get_node_executions,
)
from backend.data.graph import get_graph_settings
from backend.data.human_review import (
@@ -36,6 +41,38 @@ router = APIRouter(
)
async def _resolve_node_ids(
node_exec_ids: list[str],
graph_exec_id: str,
is_copilot: bool,
) -> dict[str, str]:
"""Resolve node_exec_id -> node_id for auto-approval records.
CoPilot synthetic IDs encode node_id in the format "{node_id}:{random}".
Graph executions look up node_id from NodeExecution records.
"""
if not node_exec_ids:
return {}
if is_copilot:
return {neid: parse_node_id_from_exec_id(neid) for neid in node_exec_ids}
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {ne.node_exec_id: ne.node_id for ne in node_execs}
result = {}
for neid in node_exec_ids:
if neid in node_exec_map:
result[neid] = node_exec_map[neid]
else:
logger.error(
f"Failed to resolve node_id for {neid}: Node execution not found."
)
return result
@router.get(
"/pending",
summary="Get Pending Reviews",
@@ -110,14 +147,16 @@ async def list_pending_reviews_for_execution(
"""
# Verify user owns the graph execution before returning reviews
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
# (CoPilot synthetic IDs don't have graph execution records)
if not is_copilot_synthetic_id(graph_exec_id):
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
return await get_pending_reviews_for_execution(graph_exec_id, user_id)
@@ -160,30 +199,26 @@ async def process_review_action(
)
graph_exec_id = next(iter(graph_exec_ids))
is_copilot = is_copilot_synthetic_id(graph_exec_id)
# Validate execution status before processing reviews
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
# Only allow processing reviews if execution is paused for review
# or incomplete (partial execution with some reviews already processed)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
f"Reviews can only be processed when execution is paused (REVIEW status). "
f"Current status: {graph_exec_meta.status}",
# Validate execution status for graph executions (skip for CoPilot synthetic IDs)
if not is_copilot:
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}",
)
# Build review decisions map and track which reviews requested auto-approval
# Auto-approved reviews use original data (no modifications allowed)
@@ -236,7 +271,7 @@ async def process_review_action(
)
return (node_id, False)
# Collect node_exec_ids that need auto-approval
# Collect node_exec_ids that need auto-approval and resolve their node_ids
node_exec_ids_needing_auto_approval = [
node_exec_id
for node_exec_id, review_result in updated_reviews.items()
@@ -244,29 +279,16 @@ async def process_review_action(
and auto_approve_requests.get(node_exec_id, False)
]
# Batch-fetch node executions to get node_ids
node_id_map = await _resolve_node_ids(
node_exec_ids_needing_auto_approval, graph_exec_id, is_copilot
)
# Deduplicate by node_id — one auto-approval per node
nodes_needing_auto_approval: dict[str, Any] = {}
if node_exec_ids_needing_auto_approval:
from backend.data.execution import get_node_executions
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {node_exec.node_exec_id: node_exec for node_exec in node_execs}
for node_exec_id in node_exec_ids_needing_auto_approval:
node_exec = node_exec_map.get(node_exec_id)
if node_exec:
review_result = updated_reviews[node_exec_id]
# Use the first approved review for this node (deduplicate by node_id)
if node_exec.node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_exec.node_id] = review_result
else:
logger.error(
f"Failed to create auto-approval record for {node_exec_id}: "
f"Node execution not found. This may indicate a race condition "
f"or data inconsistency."
)
for node_exec_id in node_exec_ids_needing_auto_approval:
node_id = node_id_map.get(node_exec_id)
if node_id and node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_id] = updated_reviews[node_exec_id]
# Execute all auto-approval creations in parallel (deduplicated by node_id)
auto_approval_results = await asyncio.gather(
@@ -281,13 +303,11 @@ async def process_review_action(
auto_approval_failed_count = 0
for result in auto_approval_results:
if isinstance(result, Exception):
# Unexpected exception during auto-approval creation
auto_approval_failed_count += 1
logger.error(
f"Unexpected exception during auto-approval creation: {result}"
)
elif isinstance(result, tuple) and len(result) == 2 and not result[1]:
# Auto-approval creation failed (returned False)
auto_approval_failed_count += 1
# Count results
@@ -302,22 +322,20 @@ async def process_review_action(
if review.status == ReviewStatus.REJECTED
)
# Resume execution only if ALL pending reviews for this execution have been processed
if updated_reviews:
# Resume graph execution only for real graph executions (not CoPilot)
# CoPilot sessions are resumed by the LLM retrying run_block with review_id
if not is_copilot and updated_reviews:
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
if not still_has_pending:
# Get the graph_id from any processed review
first_review = next(iter(updated_reviews.values()))
try:
# Fetch user and settings to build complete execution context
user = await get_user_by_id(user_id)
settings = await get_graph_settings(
user_id=user_id, graph_id=first_review.graph_id
)
# Preserve user's timezone preference when resuming execution
user_timezone = (
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
)

View File

@@ -24,7 +24,7 @@ from backend.blocks.mcp.oauth import MCPOAuthHandler
from backend.data.model import OAuth2Credentials
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.providers import ProviderName
from backend.util.request import HTTPClientError, Requests, validate_url
from backend.util.request import HTTPClientError, Requests, validate_url_host
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
@@ -80,7 +80,7 @@ async def discover_tools(
"""
# Validate URL to prevent SSRF — blocks loopback and private IP ranges.
try:
await validate_url(request.server_url, trusted_origins=[])
await validate_url_host(request.server_url)
except ValueError as e:
raise fastapi.HTTPException(status_code=400, detail=f"Invalid server URL: {e}")
@@ -167,7 +167,7 @@ async def mcp_oauth_login(
"""
# Validate URL to prevent SSRF — blocks loopback and private IP ranges.
try:
await validate_url(request.server_url, trusted_origins=[])
await validate_url_host(request.server_url)
except ValueError as e:
raise fastapi.HTTPException(status_code=400, detail=f"Invalid server URL: {e}")
@@ -187,7 +187,7 @@ async def mcp_oauth_login(
# Validate the auth server URL from metadata to prevent SSRF.
try:
await validate_url(auth_server_url, trusted_origins=[])
await validate_url_host(auth_server_url)
except ValueError as e:
raise fastapi.HTTPException(
status_code=400,
@@ -234,7 +234,7 @@ async def mcp_oauth_login(
if registration_endpoint:
# Validate the registration endpoint to prevent SSRF via metadata.
try:
await validate_url(registration_endpoint, trusted_origins=[])
await validate_url_host(registration_endpoint)
except ValueError:
pass # Skip registration, fall back to default client_id
else:
@@ -429,7 +429,7 @@ async def mcp_store_token(
# Validate URL to prevent SSRF — blocks loopback and private IP ranges.
try:
await validate_url(request.server_url, trusted_origins=[])
await validate_url_host(request.server_url)
except ValueError as e:
raise fastapi.HTTPException(status_code=400, detail=f"Invalid server URL: {e}")

View File

@@ -32,9 +32,9 @@ async def client():
@pytest.fixture(autouse=True)
def _bypass_ssrf_validation():
"""Bypass validate_url in all route tests (test URLs don't resolve)."""
"""Bypass validate_url_host in all route tests (test URLs don't resolve)."""
with patch(
"backend.api.features.mcp.routes.validate_url",
"backend.api.features.mcp.routes.validate_url_host",
new_callable=AsyncMock,
):
yield
@@ -521,12 +521,12 @@ class TestStoreToken:
class TestSSRFValidation:
"""Verify that validate_url is enforced on all endpoints."""
"""Verify that validate_url_host is enforced on all endpoints."""
@pytest.mark.asyncio(loop_scope="session")
async def test_discover_tools_ssrf_blocked(self, client):
with patch(
"backend.api.features.mcp.routes.validate_url",
"backend.api.features.mcp.routes.validate_url_host",
new_callable=AsyncMock,
side_effect=ValueError("blocked loopback"),
):
@@ -541,7 +541,7 @@ class TestSSRFValidation:
@pytest.mark.asyncio(loop_scope="session")
async def test_oauth_login_ssrf_blocked(self, client):
with patch(
"backend.api.features.mcp.routes.validate_url",
"backend.api.features.mcp.routes.validate_url_host",
new_callable=AsyncMock,
side_effect=ValueError("blocked private IP"),
):
@@ -556,7 +556,7 @@ class TestSSRFValidation:
@pytest.mark.asyncio(loop_scope="session")
async def test_store_token_ssrf_blocked(self, client):
with patch(
"backend.api.features.mcp.routes.validate_url",
"backend.api.features.mcp.routes.validate_url_host",
new_callable=AsyncMock,
side_effect=ValueError("blocked loopback"),
):

View File

@@ -624,6 +624,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
graph_id: str,
graph_version: int,
execution_context: "ExecutionContext",
is_graph_execution: bool = True,
**kwargs,
) -> tuple[bool, BlockInput]:
"""
@@ -652,6 +653,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
graph_version=graph_version,
block_name=self.name,
editable=True,
is_graph_execution=is_graph_execution,
)
if decision is None:

View File

@@ -126,7 +126,7 @@ class PrintToConsoleBlock(Block):
output_schema=PrintToConsoleBlock.Output,
test_input={"text": "Hello, World!"},
is_sensitive_action=True,
disabled=True, # Disabled per Nick Tindle's request (OPEN-3000)
disabled=True,
test_output=[
("output", "Hello, World!"),
("status", "printed"),

View File

@@ -67,6 +67,7 @@ class HITLReviewHelper:
graph_version: int,
block_name: str = "Block",
editable: bool = False,
is_graph_execution: bool = True,
) -> Optional[ReviewResult]:
"""
Handle a review request for a block that requires human review.
@@ -143,10 +144,11 @@ class HITLReviewHelper:
logger.info(
f"Block {block_name} pausing execution for node {node_exec_id} - awaiting human review"
)
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
if is_graph_execution:
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
return None # Signal that execution should pause
# Mark review as processed if not already done
@@ -168,6 +170,7 @@ class HITLReviewHelper:
graph_version: int,
block_name: str = "Block",
editable: bool = False,
is_graph_execution: bool = True,
) -> Optional[ReviewDecision]:
"""
Handle a review request and return the decision in a single call.
@@ -197,6 +200,7 @@ class HITLReviewHelper:
graph_version=graph_version,
block_name=block_name,
editable=editable,
is_graph_execution=is_graph_execution,
)
if review_result is None:

View File

@@ -17,7 +17,7 @@ from backend.blocks.jina._auth import (
from backend.blocks.search import GetRequest
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.request import HTTPClientError, HTTPServerError, validate_url
from backend.util.request import HTTPClientError, HTTPServerError, validate_url_host
class SearchTheWebBlock(Block, GetRequest):
@@ -112,7 +112,7 @@ class ExtractWebsiteContentBlock(Block, GetRequest):
) -> BlockOutput:
if input_data.raw_content:
try:
parsed_url, _, _ = await validate_url(input_data.url, [])
parsed_url, _, _ = await validate_url_host(input_data.url)
url = parsed_url.geturl()
except ValueError as e:
yield "error", f"Invalid URL: {e}"

View File

@@ -34,8 +34,11 @@ from backend.util import json
from backend.util.clients import OPENROUTER_BASE_URL
from backend.util.logging import TruncatedLogger
from backend.util.prompt import compress_context, estimate_token_count
from backend.util.request import validate_url_host
from backend.util.settings import Settings
from backend.util.text import TextFormatter
settings = Settings()
logger = TruncatedLogger(logging.getLogger(__name__), "[LLM-Block]")
fmt = TextFormatter(autoescape=False)
@@ -805,6 +808,11 @@ async def llm_call(
if tools:
raise ValueError("Ollama does not support tools.")
# Validate user-provided Ollama host to prevent SSRF etc.
await validate_url_host(
ollama_host, trusted_hostnames=[settings.config.ollama_host]
)
client = ollama.AsyncClient(host=ollama_host)
sys_messages = [p["content"] for p in prompt if p["role"] == "system"]
usr_messages = [p["content"] for p in prompt if p["role"] != "system"]

View File

@@ -6,6 +6,32 @@
COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]" # Renders as ErrorCard
COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]" # Renders as system info message
# Prefix for all synthetic IDs generated by CoPilot block execution.
# Used to distinguish CoPilot-generated records from real graph execution records
# in PendingHumanReview and other tables.
COPILOT_SYNTHETIC_ID_PREFIX = "copilot-"
# Sub-prefixes for session-scoped and node-scoped synthetic IDs.
COPILOT_SESSION_PREFIX = f"{COPILOT_SYNTHETIC_ID_PREFIX}session-"
COPILOT_NODE_PREFIX = f"{COPILOT_SYNTHETIC_ID_PREFIX}node-"
# Separator used in synthetic node_exec_id to encode node_id.
# Format: "{node_id}:{random_hex}" — extract node_id via rsplit(":", 1)[0]
COPILOT_NODE_EXEC_ID_SEPARATOR = ":"
# Compaction notice messages shown to users.
COMPACTION_DONE_MSG = "Earlier messages were summarized to fit within context limits."
COMPACTION_TOOL_NAME = "context_compaction"
def is_copilot_synthetic_id(id_value: str) -> bool:
"""Check if an ID is a CoPilot synthetic ID (not from a real graph execution)."""
return id_value.startswith(COPILOT_SYNTHETIC_ID_PREFIX)
def parse_node_id_from_exec_id(node_exec_id: str) -> str:
"""Extract node_id from a synthetic node_exec_id.
Format: "{node_id}:{random_hex}" → returns "{node_id}".
"""
return node_exec_id.rsplit(COPILOT_NODE_EXEC_ID_SEPARATOR, 1)[0]

View File

@@ -12,6 +12,7 @@ from .agent_browser import BrowserActTool, BrowserNavigateTool, BrowserScreensho
from .agent_output import AgentOutputTool
from .base import BaseTool
from .bash_exec import BashExecTool
from .continue_run_block import ContinueRunBlockTool
from .create_agent import CreateAgentTool
from .customize_agent import CustomizeAgentTool
from .edit_agent import EditAgentTool
@@ -68,6 +69,7 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"move_agents_to_folder": MoveAgentsToFolderTool(),
"run_agent": RunAgentTool(),
"run_block": RunBlockTool(),
"continue_run_block": ContinueRunBlockTool(),
"run_mcp_tool": RunMCPToolTool(),
"get_mcp_guide": GetMCPGuideTool(),
"view_agent_output": AgentOutputTool(),

View File

@@ -33,7 +33,7 @@ import tempfile
from typing import Any
from backend.copilot.model import ChatSession
from backend.util.request import validate_url
from backend.util.request import validate_url_host
from .base import BaseTool
from .models import (
@@ -235,7 +235,7 @@ async def _restore_browser_state(
if url:
# Validate the saved URL to prevent SSRF via stored redirect targets.
try:
await validate_url(url, trusted_origins=[])
await validate_url_host(url)
except ValueError:
logger.warning(
"[browser] State restore: blocked SSRF URL %s", url[:200]
@@ -473,7 +473,7 @@ class BrowserNavigateTool(BaseTool):
)
try:
await validate_url(url, trusted_origins=[])
await validate_url_host(url)
except ValueError as e:
return ErrorResponse(
message=str(e),

View File

@@ -68,17 +68,18 @@ def _run_result(rc: int = 0, stdout: str = "", stderr: str = "") -> tuple:
# ---------------------------------------------------------------------------
# SSRF protection via shared validate_url (backend.util.request)
# SSRF protection via shared validate_url_host (backend.util.request)
# ---------------------------------------------------------------------------
# Patch target: validate_url is imported directly into agent_browser's module scope.
_VALIDATE_URL = "backend.copilot.tools.agent_browser.validate_url"
# Patch target: validate_url_host is imported directly into agent_browser's
# module scope.
_VALIDATE_URL = "backend.copilot.tools.agent_browser.validate_url_host"
class TestSsrfViaValidateUrl:
"""Verify that browser_navigate uses validate_url for SSRF protection.
"""Verify that browser_navigate uses validate_url_host for SSRF protection.
We mock validate_url itself (not the low-level socket) so these tests
We mock validate_url_host itself (not the low-level socket) so these tests
exercise the integration point, not the internals of request.py
(which has its own thorough test suite in request_test.py).
"""
@@ -89,7 +90,7 @@ class TestSsrfViaValidateUrl:
@pytest.mark.asyncio
async def test_blocked_ip_returns_blocked_url_error(self):
"""validate_url raises ValueError → tool returns blocked_url ErrorResponse."""
"""validate_url_host raises ValueError → tool returns blocked_url ErrorResponse."""
with patch(_VALIDATE_URL, new_callable=AsyncMock) as mock_validate:
mock_validate.side_effect = ValueError(
"Access to blocked IP 10.0.0.1 is not allowed."
@@ -124,8 +125,8 @@ class TestSsrfViaValidateUrl:
assert result.error == "blocked_url"
@pytest.mark.asyncio
async def test_validate_url_called_with_empty_trusted_origins(self):
"""Confirms no trusted-origins bypass is granted — all URLs are validated."""
async def test_validate_url_host_called_without_trusted_hostnames(self):
"""Confirms no trusted-hostnames bypass is granted — all URLs are validated."""
with patch(_VALIDATE_URL, new_callable=AsyncMock) as mock_validate:
mock_validate.return_value = (object(), False, ["1.2.3.4"])
with patch(
@@ -143,7 +144,7 @@ class TestSsrfViaValidateUrl:
session=self.session,
url="https://example.com",
)
mock_validate.assert_called_once_with("https://example.com", trusted_origins=[])
mock_validate.assert_called_once_with("https://example.com")
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,157 @@
"""Tool for continuing block execution after human review approval."""
import logging
from typing import Any
from prisma.enums import ReviewStatus
from backend.blocks import get_block
from backend.copilot.constants import (
COPILOT_NODE_PREFIX,
COPILOT_SESSION_PREFIX,
parse_node_id_from_exec_id,
)
from backend.copilot.model import ChatSession
from backend.data.db_accessors import review_db
from .base import BaseTool
from .helpers import execute_block, resolve_block_credentials
from .models import ErrorResponse, ToolResponseBase
logger = logging.getLogger(__name__)
class ContinueRunBlockTool(BaseTool):
"""Tool for continuing a block execution after human review approval."""
@property
def name(self) -> str:
return "continue_run_block"
@property
def description(self) -> str:
return (
"Continue executing a block after human review approval. "
"Use this after a run_block call returned review_required. "
"Pass the review_id from the review_required response. "
"The block will execute with the original pre-approved input data."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"review_id": {
"type": "string",
"description": (
"The review_id from a previous review_required response. "
"This resumes execution with the pre-approved input data."
),
},
},
"required": ["review_id"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
review_id = (
kwargs.get("review_id", "").strip() if kwargs.get("review_id") else ""
)
session_id = session.session_id
if not review_id:
return ErrorResponse(
message="Please provide a review_id", session_id=session_id
)
if not user_id:
return ErrorResponse(
message="Authentication required", session_id=session_id
)
# Look up and validate the review record via adapter
reviews = await review_db().get_reviews_by_node_exec_ids([review_id], user_id)
review = reviews.get(review_id)
if not review:
return ErrorResponse(
message=(
f"Review '{review_id}' not found or already executed. "
"It may have been consumed by a previous continue_run_block call."
),
session_id=session_id,
)
# Validate the review belongs to this session
expected_graph_exec_id = f"{COPILOT_SESSION_PREFIX}{session_id}"
if review.graph_exec_id != expected_graph_exec_id:
return ErrorResponse(
message="Review does not belong to this session.",
session_id=session_id,
)
if review.status == ReviewStatus.WAITING:
return ErrorResponse(
message="Review has not been approved yet. "
"Please wait for the user to approve the review first.",
session_id=session_id,
)
if review.status == ReviewStatus.REJECTED:
return ErrorResponse(
message="Review was rejected. The block will not execute.",
session_id=session_id,
)
# Extract block_id from review_id: copilot-node-{block_id}:{random_hex}
block_id = parse_node_id_from_exec_id(review_id).removeprefix(
COPILOT_NODE_PREFIX
)
block = get_block(block_id)
if not block:
return ErrorResponse(
message=f"Block '{block_id}' not found", session_id=session_id
)
input_data: dict[str, Any] = (
review.payload if isinstance(review.payload, dict) else {}
)
logger.info(
f"Continuing block {block.name} ({block_id}) for user {user_id} "
f"with review_id={review_id}"
)
matched_creds, missing_creds = await resolve_block_credentials(
user_id, block, input_data
)
if missing_creds:
return ErrorResponse(
message=f"Block '{block.name}' requires credentials that are not configured.",
session_id=session_id,
)
result = await execute_block(
block=block,
block_id=block_id,
input_data=input_data,
user_id=user_id,
session_id=session_id,
node_exec_id=review_id,
matched_credentials=matched_creds,
)
# Delete review record after successful execution (one-time use)
if result.type != "error":
await review_db().delete_review_by_node_exec_id(review_id, user_id)
return result

View File

@@ -0,0 +1,186 @@
"""Tests for ContinueRunBlockTool."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from prisma.enums import ReviewStatus
from ._test_data import make_session
from .continue_run_block import ContinueRunBlockTool
from .models import BlockOutputResponse, ErrorResponse
_TEST_USER_ID = "test-user-continue"
def _make_review_model(
node_exec_id: str,
status: ReviewStatus = ReviewStatus.APPROVED,
payload: dict | None = None,
graph_exec_id: str = "",
):
"""Create a mock PendingHumanReviewModel."""
mock = MagicMock()
mock.node_exec_id = node_exec_id
mock.status = status
mock.payload = payload or {"text": "hello"}
mock.graph_exec_id = graph_exec_id
return mock
class TestContinueRunBlock:
@pytest.mark.asyncio(loop_scope="session")
async def test_missing_review_id_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id="",
)
assert isinstance(response, ErrorResponse)
assert "review_id" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_review_not_found_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(return_value={})
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id="copilot-node-some-block:abc12345",
)
assert isinstance(response, ErrorResponse)
assert "not found" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_waiting_review_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-some-block:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
review = _make_review_model(
review_id, status=ReviewStatus.WAITING, graph_exec_id=graph_exec_id
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, ErrorResponse)
assert "not been approved" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_rejected_review_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-some-block:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
review = _make_review_model(
review_id, status=ReviewStatus.REJECTED, graph_exec_id=graph_exec_id
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, ErrorResponse)
assert "rejected" in response.message.lower()
@pytest.mark.asyncio(loop_scope="session")
async def test_approved_review_executes_block(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-delete-branch-id:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
input_data = {"repo_url": "https://github.com/test/repo", "branch": "main"}
review = _make_review_model(
review_id,
status=ReviewStatus.APPROVED,
payload=input_data,
graph_exec_id=graph_exec_id,
)
mock_block = MagicMock()
mock_block.name = "Delete Branch"
async def mock_execute(data, **kwargs):
yield "result", "Branch deleted"
mock_block.execute = mock_execute
mock_block.input_schema.get_credentials_fields_info.return_value = []
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
mock_db.delete_review_by_node_exec_id = AsyncMock(return_value=1)
with (
patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
),
patch(
"backend.copilot.tools.continue_run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
assert response.block_name == "Delete Branch"
# Verify review was deleted (one-time use)
mock_db.delete_review_by_node_exec_id.assert_called_once_with(
review_id, _TEST_USER_ID
)

View File

@@ -1,7 +1,24 @@
"""Shared helpers for chat tools."""
import logging
from collections import defaultdict
from typing import Any
from pydantic_core import PydanticUndefined
from backend.blocks._base import AnyBlockSchema
from backend.copilot.constants import COPILOT_NODE_PREFIX, COPILOT_SESSION_PREFIX
from backend.data.db_accessors import workspace_db
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .models import BlockOutputResponse, ErrorResponse, ToolResponseBase
from .utils import match_credentials_to_requirements
logger = logging.getLogger(__name__)
def get_inputs_from_schema(
input_schema: dict[str, Any],
@@ -27,3 +44,159 @@ def get_inputs_from_schema(
for name, schema in properties.items()
if name not in exclude
]
async def execute_block(
*,
block: AnyBlockSchema,
block_id: str,
input_data: dict[str, Any],
user_id: str,
session_id: str,
node_exec_id: str,
matched_credentials: dict[str, CredentialsMetaInput],
sensitive_action_safe_mode: bool = False,
) -> ToolResponseBase:
"""Execute a block with full context setup, credential injection, and error handling.
This is the shared execution path used by both ``run_block`` (after review
check) and ``continue_run_block`` (after approval).
Returns:
BlockOutputResponse on success, ErrorResponse on failure.
"""
try:
workspace = await workspace_db().get_or_create_workspace(user_id)
synthetic_graph_id = f"{COPILOT_SESSION_PREFIX}{session_id}"
synthetic_node_id = f"{COPILOT_NODE_PREFIX}{block_id}"
execution_context = ExecutionContext(
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_id,
graph_version=1,
node_id=synthetic_node_id,
node_exec_id=node_exec_id,
workspace_id=workspace.id,
session_id=session_id,
sensitive_action_safe_mode=sensitive_action_safe_mode,
)
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": execution_context,
"workspace_id": workspace.id,
"graph_exec_id": synthetic_graph_id,
"node_exec_id": node_exec_id,
"node_id": synthetic_node_id,
"graph_version": 1,
"graph_id": synthetic_graph_id,
}
# Inject credentials
creds_manager = IntegrationCredentialsManager()
for field_name, cred_meta in matched_credentials.items():
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
session_id=session_id,
)
async def resolve_block_credentials(
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""Resolve credentials for a block by matching user's available credentials.
Handles discriminated credentials (e.g. provider selection based on model).
Returns:
(matched_credentials, missing_credentials)
"""
input_data = input_data or {}
requirements = _resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
def _resolve_discriminated_credentials(
block: AnyBlockSchema,
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(field_info.discriminator)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
resolved[field_name] = effective_field_info
return resolved

View File

@@ -39,6 +39,7 @@ class ResponseType(str, Enum):
BLOCK_LIST = "block_list"
BLOCK_DETAILS = "block_details"
BLOCK_OUTPUT = "block_output"
REVIEW_REQUIRED = "review_required"
# MCP
MCP_GUIDE = "mcp_guide"
@@ -458,6 +459,21 @@ class BlockOutputResponse(ToolResponseBase):
success: bool = True
class ReviewRequiredResponse(ToolResponseBase):
"""Response when a block requires human review before execution."""
type: ResponseType = ResponseType.REVIEW_REQUIRED
block_id: str
block_name: str
review_id: str = Field(description="The review ID for tracking approval status")
graph_exec_id: str = Field(
description="The graph execution ID for fetching review status"
)
input_data: dict[str, Any] = Field(
description="The input data that requires review"
)
class WebFetchResponse(ToolResponseBase):
"""Response for web_fetch tool."""

View File

@@ -534,7 +534,9 @@ class RunAgentTool(BaseTool):
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' is awaiting human review. "
f"Check at {library_agent_link}."
f"The user can approve or reject inline. After approval, "
f"the execution resumes automatically. Use view_agent_output "
f"with execution_id='{execution.id}' to check the result."
),
session_id=session_id,
execution_id=execution.id,

View File

@@ -2,38 +2,34 @@
import logging
import uuid
from collections import defaultdict
from typing import Any
from pydantic_core import PydanticUndefined
from backend.blocks import BlockType, get_block
from backend.blocks._base import AnyBlockSchema
from backend.copilot.constants import (
COPILOT_NODE_EXEC_ID_SEPARATOR,
COPILOT_NODE_PREFIX,
COPILOT_SESSION_PREFIX,
)
from backend.copilot.model import ChatSession
from backend.data.db_accessors import workspace_db
from backend.data.db_accessors import review_db
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .base import BaseTool
from .find_block import COPILOT_EXCLUDED_BLOCK_IDS, COPILOT_EXCLUDED_BLOCK_TYPES
from .helpers import get_inputs_from_schema
from .helpers import execute_block, get_inputs_from_schema, resolve_block_credentials
from .models import (
BlockDetails,
BlockDetailsResponse,
BlockOutputResponse,
ErrorResponse,
InputValidationErrorResponse,
ReviewRequiredResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
from .utils import (
build_missing_credentials_from_field_info,
match_credentials_to_requirements,
)
from .utils import build_missing_credentials_from_field_info
logger = logging.getLogger(__name__)
@@ -52,7 +48,9 @@ class RunBlockTool(BaseTool):
"IMPORTANT: You MUST call find_block first to get the block's 'id' - "
"do NOT guess or make up block IDs. "
"On first attempt (without input_data), returns detailed schema showing "
"required inputs and outputs. Then call again with proper input_data to execute."
"required inputs and outputs. Then call again with proper input_data to execute. "
"If a block requires human review, use continue_run_block with the "
"review_id after the user approves."
)
@property
@@ -166,11 +164,10 @@ class RunBlockTool(BaseTool):
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
creds_manager = IntegrationCredentialsManager()
(
matched_credentials,
missing_credentials,
) = await self._resolve_block_credentials(user_id, block, input_data)
) = await resolve_block_credentials(user_id, block, input_data)
# Get block schemas for details/validation
try:
@@ -279,169 +276,97 @@ class RunBlockTool(BaseTool):
user_authenticated=True,
)
try:
# Get or create user's workspace for CoPilot file operations
workspace = await workspace_db().get_or_create_workspace(user_id)
# Generate synthetic IDs for CoPilot context.
# Encode node_id in node_exec_id so it can be extracted later
# (e.g. for auto-approve, where we need node_id but have no NodeExecution row).
synthetic_graph_id = f"{COPILOT_SESSION_PREFIX}{session.session_id}"
synthetic_node_id = f"{COPILOT_NODE_PREFIX}{block_id}"
# Generate synthetic IDs for CoPilot context
# Each chat session is treated as its own agent with one continuous run
# This means:
# - graph_id (agent) = session (memories scoped to session when limit_to_agent=True)
# - graph_exec_id (run) = session (memories scoped to session when limit_to_run=True)
# - node_exec_id = unique per block execution
synthetic_graph_id = f"copilot-session-{session.session_id}"
synthetic_graph_exec_id = f"copilot-session-{session.session_id}"
synthetic_node_id = f"copilot-node-{block_id}"
synthetic_node_exec_id = (
f"copilot-{session.session_id}-{uuid.uuid4().hex[:8]}"
)
# Create unified execution context with all required fields
execution_context = ExecutionContext(
# Execution identity
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_exec_id,
graph_version=1, # Versions are 1-indexed
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
# Workspace with session scoping
workspace_id=workspace.id,
session_id=session.session_id,
)
# Prepare kwargs for block execution
# Keep individual kwargs for backwards compatibility with existing blocks
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": execution_context,
# Legacy: individual kwargs for blocks not yet using execution_context
"workspace_id": workspace.id,
"graph_exec_id": synthetic_graph_exec_id,
"node_exec_id": synthetic_node_exec_id,
"node_id": synthetic_node_id,
"graph_version": 1, # Versions are 1-indexed
"graph_id": synthetic_graph_id,
}
for field_name, cred_meta in matched_credentials.items():
# Inject metadata into input_data (for validation)
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
# Fetch actual credentials and pass as kwargs (for execution)
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
# Check for an existing WAITING review for this block with the same input.
# If the LLM retries run_block with identical input, we reuse the existing
# review instead of creating duplicates. Different inputs = new execution.
existing_reviews = await review_db().get_pending_reviews_for_execution(
synthetic_graph_id, user_id
)
existing_review = next(
(
r
for r in existing_reviews
if r.node_id == synthetic_node_id
and r.status.value == "WAITING"
and r.payload == input_data
),
None,
)
if existing_review:
return ReviewRequiredResponse(
message=(
f"Block '{block.name}' requires human review. "
f"After the user approves, call continue_run_block with "
f"review_id='{existing_review.node_exec_id}' to execute."
),
session_id=session_id,
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
review_id=existing_review.node_exec_id,
graph_exec_id=synthetic_graph_id,
input_data=input_data,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
synthetic_node_exec_id = (
f"{synthetic_node_id}{COPILOT_NODE_EXEC_ID_SEPARATOR}"
f"{uuid.uuid4().hex[:8]}"
)
# Check for HITL review before execution.
# This creates the review record in the DB for CoPilot flows.
review_context = ExecutionContext(
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_id,
graph_version=1,
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
sensitive_action_safe_mode=True,
)
should_pause, input_data = await block.is_block_exec_need_review(
input_data,
user_id=user_id,
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
graph_exec_id=synthetic_graph_id,
graph_id=synthetic_graph_id,
graph_version=1,
execution_context=review_context,
is_graph_execution=False,
)
if should_pause:
return ReviewRequiredResponse(
message=(
f"Block '{block.name}' requires human review. "
f"After the user approves, call continue_run_block with "
f"review_id='{synthetic_node_exec_id}' to execute."
),
session_id=session_id,
block_id=block_id,
block_name=block.name,
review_id=synthetic_node_exec_id,
graph_exec_id=synthetic_graph_id,
input_data=input_data,
)
async def _resolve_block_credentials(
self,
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Resolve credentials for a block by matching user's available credentials.
Args:
user_id: User ID
block: Block to resolve credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple of (matched_credentials, missing_credentials) - matched credentials
are used for block execution, missing ones indicate setup requirements.
"""
input_data = input_data or {}
requirements = self._resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
return await execute_block(
block=block,
block_id=block_id,
input_data=input_data,
user_id=user_id,
session_id=session_id,
node_exec_id=synthetic_node_exec_id,
matched_credentials=matched_credentials,
)
def _get_inputs_list(self, block: AnyBlockSchema) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema."""
schema = block.input_schema.jsonschema()
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)
def _resolve_discriminated_credentials(
self,
block: AnyBlockSchema,
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(
field_info.discriminator
)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
# For host-scoped credentials, add the discriminator value
# (e.g., URL) so _credential_is_for_host can match it
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
resolved[field_name] = effective_field_info
return resolved

View File

@@ -12,6 +12,7 @@ from .models import (
BlockOutputResponse,
ErrorResponse,
InputValidationErrorResponse,
ReviewRequiredResponse,
)
from .run_block import RunBlockTool
@@ -27,9 +28,16 @@ def make_mock_block(
mock.name = name
mock.block_type = block_type
mock.disabled = disabled
mock.is_sensitive_action = False
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields_info.return_value = []
mock.input_schema.get_credentials_fields_info.return_value = {}
mock.input_schema.get_credentials_fields.return_value = {}
async def _no_review(input_data, **kwargs):
return False, input_data
mock.is_block_exec_need_review = _no_review
return mock
@@ -46,6 +54,7 @@ def make_mock_block_with_schema(
mock.name = name
mock.block_type = BlockType.STANDARD
mock.disabled = False
mock.is_sensitive_action = False
mock.description = f"Test block: {name}"
input_schema = {
@@ -63,6 +72,12 @@ def make_mock_block_with_schema(
mock.output_schema = MagicMock()
mock.output_schema.jsonschema.return_value = output_schema
# Default: no review needed, pass through input_data unchanged
async def _no_review(input_data, **kwargs):
return False, input_data
mock.is_block_exec_need_review = _no_review
return mock
@@ -126,9 +141,15 @@ class TestRunBlockFiltering:
"standard-id", "HTTP Request", BlockType.STANDARD
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=standard_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=standard_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
@@ -154,12 +175,7 @@ class TestRunBlockInputValidation:
@pytest.mark.asyncio(loop_scope="session")
async def test_unknown_input_fields_are_rejected(self):
"""run_block rejects unknown input fields instead of silently ignoring them.
Scenario: The AI Text Generator block has a field called 'model' (for LLM model
selection), but the LLM calling the tool guesses wrong and sends 'LLM_Model'
instead. The block should reject the request and return the valid schema.
"""
"""run_block rejects unknown input fields instead of silently ignoring them."""
session = make_session(user_id=_TEST_USER_ID)
mock_block = make_mock_block_with_schema(
@@ -182,27 +198,31 @@ class TestRunBlockInputValidation:
output_properties={"response": {"type": "string"}},
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
# Provide 'prompt' (correct) but 'LLM_Model' instead of 'model' (wrong key)
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"prompt": "Write a haiku about coding",
"LLM_Model": "claude-opus-4-6", # WRONG KEY - should be 'model'
"LLM_Model": "claude-opus-4-6",
},
)
assert isinstance(response, InputValidationErrorResponse)
assert "LLM_Model" in response.unrecognized_fields
assert "Block was not executed" in response.message
assert "inputs" in response.model_dump() # valid schema included
assert "inputs" in response.model_dump()
@pytest.mark.asyncio(loop_scope="session")
async def test_multiple_wrong_keys_are_all_reported(self):
@@ -221,21 +241,26 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"prompt": "Hello", # correct
"llm_model": "claude-opus-4-6", # WRONG - should be 'model'
"system_prompt": "Be helpful", # WRONG - should be 'sys_prompt'
"retries": 5, # WRONG - should be 'retry'
"prompt": "Hello",
"llm_model": "claude-opus-4-6",
"system_prompt": "Be helpful",
"retries": 5,
},
)
@@ -262,23 +287,26 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
# 'prompt' is missing AND 'LLM_Model' is an unknown field
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"LLM_Model": "claude-opus-4-6", # wrong key, and 'prompt' is missing
"LLM_Model": "claude-opus-4-6",
},
)
# Unknown fields are caught first
assert isinstance(response, InputValidationErrorResponse)
assert "LLM_Model" in response.unrecognized_fields
@@ -313,7 +341,11 @@ class TestRunBlockInputValidation:
return_value=mock_block,
),
patch(
"backend.copilot.tools.run_block.workspace_db",
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
@@ -325,7 +357,7 @@ class TestRunBlockInputValidation:
block_id="ai-text-gen-id",
input_data={
"prompt": "Write a haiku",
"model": "gpt-4o-mini", # correct field name
"model": "gpt-4o-mini",
},
)
@@ -347,20 +379,191 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
# Only provide valid optional field, missing required 'prompt'
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"model": "gpt-4o-mini", # valid but optional
"model": "gpt-4o-mini",
},
)
assert isinstance(response, BlockDetailsResponse)
class TestRunBlockSensitiveAction:
"""Tests for sensitive action HITL review in RunBlockTool.
run_block calls is_block_exec_need_review() explicitly before execution.
When review is needed (should_pause=True), ReviewRequiredResponse is returned.
"""
@pytest.mark.asyncio(loop_scope="session")
async def test_sensitive_block_paused_returns_review_required(self):
"""When is_block_exec_need_review returns should_pause=True, ReviewRequiredResponse is returned."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {
"repo_url": "https://github.com/test/repo",
"branch": "feature-branch",
}
mock_block = make_mock_block_with_schema(
block_id="delete-branch-id",
name="Delete Branch",
input_properties={
"repo_url": {"type": "string"},
"branch": {"type": "string"},
},
required_fields=["repo_url", "branch"],
)
mock_block.is_sensitive_action = True
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(True, input_data)
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="delete-branch-id",
input_data=input_data,
)
assert isinstance(response, ReviewRequiredResponse)
assert "requires human review" in response.message
assert "continue_run_block" in response.message
assert response.block_name == "Delete Branch"
@pytest.mark.asyncio(loop_scope="session")
async def test_sensitive_block_executes_after_approval(self):
"""After approval (should_pause=False), sensitive blocks execute and return outputs."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {
"repo_url": "https://github.com/test/repo",
"branch": "feature-branch",
}
mock_block = make_mock_block_with_schema(
block_id="delete-branch-id",
name="Delete Branch",
input_properties={
"repo_url": {"type": "string"},
"branch": {"type": "string"},
},
required_fields=["repo_url", "branch"],
)
mock_block.is_sensitive_action = True
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(False, input_data)
)
async def mock_execute(input_data, **kwargs):
yield "result", "Branch deleted successfully"
mock_block.execute = mock_execute
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="delete-branch-id",
input_data=input_data,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
@pytest.mark.asyncio(loop_scope="session")
async def test_non_sensitive_block_executes_normally(self):
"""Non-sensitive blocks skip review and execute directly."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {"url": "https://example.com"}
mock_block = make_mock_block_with_schema(
block_id="http-request-id",
name="HTTP Request",
input_properties={
"url": {"type": "string"},
},
required_fields=["url"],
)
mock_block.is_sensitive_action = False
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(False, input_data)
)
async def mock_execute(input_data, **kwargs):
yield "response", {"status": 200}
mock_block.execute = mock_execute
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="http-request-id",
input_data=input_data,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True

View File

@@ -14,7 +14,7 @@ from backend.blocks.mcp.helpers import (
)
from backend.copilot.model import ChatSession
from backend.copilot.tools.utils import build_missing_credentials_from_field_info
from backend.util.request import HTTPClientError, validate_url
from backend.util.request import HTTPClientError, validate_url_host
from .base import BaseTool
from .models import (
@@ -144,7 +144,7 @@ class RunMCPToolTool(BaseTool):
# Validate URL to prevent SSRF — blocks loopback and private IP ranges
try:
await validate_url(server_url, trusted_origins=[])
await validate_url_host(server_url)
except ValueError as e:
msg = str(e)
if "Unable to resolve" in msg or "No IP addresses" in msg:

View File

@@ -65,9 +65,8 @@ async def test_run_block_returns_details_when_no_input_provided():
return_value=http_block,
):
# Mock credentials check to return no missing credentials
with patch.object(
RunBlockTool,
"_resolve_block_credentials",
with patch(
"backend.copilot.tools.run_block.resolve_block_credentials",
new_callable=AsyncMock,
return_value=({}, []), # (matched_credentials, missing_credentials)
):
@@ -123,9 +122,8 @@ async def test_run_block_returns_details_when_only_credentials_provided():
"backend.copilot.tools.run_block.get_block",
return_value=mock,
):
with patch.object(
RunBlockTool,
"_resolve_block_credentials",
with patch(
"backend.copilot.tools.run_block.resolve_block_credentials",
new_callable=AsyncMock,
return_value=(
{

View File

@@ -100,7 +100,7 @@ async def test_ssrf_blocked_url_returns_error():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url",
"backend.copilot.tools.run_mcp_tool.validate_url_host",
new_callable=AsyncMock,
side_effect=ValueError("blocked loopback"),
):
@@ -138,7 +138,7 @@ async def test_non_dict_tool_arguments_returns_error():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url",
"backend.copilot.tools.run_mcp_tool.validate_url_host",
new_callable=AsyncMock,
):
with patch(
@@ -171,7 +171,7 @@ async def test_discover_tools_returns_discovered_response():
mock_tools = _make_tool_list("fetch", "search")
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -208,7 +208,7 @@ async def test_discover_tools_with_credentials():
mock_tools = _make_tool_list("push_notification")
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -249,7 +249,7 @@ async def test_execute_tool_returns_output_response():
text_result = "# Example Domain\nThis domain is for examples."
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -285,7 +285,7 @@ async def test_execute_tool_parses_json_result():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -320,7 +320,7 @@ async def test_execute_tool_image_content():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -359,7 +359,7 @@ async def test_execute_tool_resource_content():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -399,7 +399,7 @@ async def test_execute_tool_multi_item_content():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -437,7 +437,7 @@ async def test_execute_tool_empty_content_returns_none():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -470,7 +470,7 @@ async def test_execute_tool_returns_error_on_tool_failure():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -512,7 +512,7 @@ async def test_auth_required_without_creds_returns_setup_requirements():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -555,7 +555,7 @@ async def test_auth_error_with_existing_creds_returns_error():
mock_creds.access_token = SecretStr("stale-token")
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -589,7 +589,7 @@ async def test_mcp_client_error_returns_error_response():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -621,7 +621,7 @@ async def test_unexpected_exception_returns_generic_error():
session = make_session(_USER_ID)
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",
@@ -719,7 +719,7 @@ async def test_credential_lookup_normalizes_trailing_slash():
url_with_slash = "https://mcp.example.com/mcp/"
with patch(
"backend.copilot.tools.run_mcp_tool.validate_url", new_callable=AsyncMock
"backend.copilot.tools.run_mcp_tool.validate_url_host", new_callable=AsyncMock
):
with patch(
"backend.copilot.tools.run_mcp_tool.auto_lookup_mcp_credential",

View File

@@ -116,3 +116,16 @@ def workspace_db():
workspace_db = get_database_manager_async_client()
return workspace_db
def review_db():
if db.is_connected():
from backend.data import human_review as _review_db
review_db = _review_db
else:
from backend.util.clients import get_database_manager_async_client
review_db = get_database_manager_async_client()
return review_db

View File

@@ -79,7 +79,10 @@ from backend.data.graph import (
from backend.data.human_review import (
cancel_pending_reviews_for_execution,
check_approval,
delete_review_by_node_exec_id,
get_or_create_human_review,
get_pending_reviews_for_execution,
get_reviews_by_node_exec_ids,
has_pending_reviews_for_graph_exec,
update_review_processed_status,
)
@@ -246,7 +249,10 @@ class DatabaseManager(AppService):
# ============ Human In The Loop ============ #
cancel_pending_reviews_for_execution = _(cancel_pending_reviews_for_execution)
check_approval = _(check_approval)
delete_review_by_node_exec_id = _(delete_review_by_node_exec_id)
get_or_create_human_review = _(get_or_create_human_review)
get_pending_reviews_for_execution = _(get_pending_reviews_for_execution)
get_reviews_by_node_exec_ids = _(get_reviews_by_node_exec_ids)
has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
update_review_processed_status = _(update_review_processed_status)
@@ -433,7 +439,10 @@ class DatabaseManagerAsyncClient(AppServiceClient):
# ============ Human In The Loop ============ #
cancel_pending_reviews_for_execution = d.cancel_pending_reviews_for_execution
check_approval = d.check_approval
delete_review_by_node_exec_id = d.delete_review_by_node_exec_id
get_or_create_human_review = d.get_or_create_human_review
get_pending_reviews_for_execution = d.get_pending_reviews_for_execution
get_reviews_by_node_exec_ids = d.get_reviews_by_node_exec_ids
update_review_processed_status = d.update_review_processed_status
# ============ User Comms ============ #

View File

@@ -17,6 +17,10 @@ from backend.api.features.executions.review.model import (
PendingHumanReviewModel,
SafeJsonData,
)
from backend.copilot.constants import (
is_copilot_synthetic_id,
parse_node_id_from_exec_id,
)
from backend.data.execution import get_graph_execution_meta
from backend.util.json import SafeJson
@@ -123,11 +127,13 @@ async def create_auto_approval_record(
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate that the graph execution belongs to this user (defense in depth)
graph_exec = await get_graph_execution_meta(
# Validate ownership: if a graph execution record exists, it must belong
# to this user. Non-graph executions (e.g. CoPilot) won't have a record.
if not is_copilot_synthetic_id(
graph_exec_id
) and not await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
):
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
@@ -265,7 +271,7 @@ async def get_pending_review_by_node_exec_id(
async def get_reviews_by_node_exec_ids(
node_exec_ids: list[str], user_id: str
) -> dict[str, "PendingHumanReviewModel"]:
) -> dict[str, PendingHumanReviewModel]:
"""
Get multiple reviews by their node execution IDs regardless of status.
@@ -292,21 +298,26 @@ async def get_reviews_by_node_exec_ids(
if not reviews:
return {}
# Batch fetch all node executions to avoid N+1 queries
node_exec_ids_to_fetch = [review.nodeExecId for review in reviews]
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": node_exec_ids_to_fetch}},
include={"Node": True},
)
# Create mapping from node_exec_id to node_id
node_exec_id_to_node_id = {
node_exec.id: node_exec.agentNodeId for node_exec in node_execs
# Split into synthetic (CoPilot) and real IDs for different resolution paths
synthetic_ids = {
r.nodeExecId for r in reviews if is_copilot_synthetic_id(r.nodeExecId)
}
real_ids = [r.nodeExecId for r in reviews if r.nodeExecId not in synthetic_ids]
# Batch fetch real node executions to avoid N+1 queries
node_exec_id_to_node_id: dict[str, str] = {}
if real_ids:
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": real_ids}},
)
node_exec_id_to_node_id = {ne.id: ne.agentNodeId for ne in node_execs}
result = {}
for review in reviews:
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
if review.nodeExecId in synthetic_ids:
node_id = parse_node_id_from_exec_id(review.nodeExecId)
else:
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
@@ -331,6 +342,19 @@ async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
return count > 0
async def _resolve_node_id(node_exec_id: str, get_node_execution) -> str:
"""Resolve node_id from a node_exec_id.
For CoPilot synthetic IDs (e.g. copilot-node-block-id:abc12345),
extract the node_id portion (copilot-node-block-id).
For real graph executions, look up the NodeExecution record.
"""
if is_copilot_synthetic_id(node_exec_id):
return parse_node_id_from_exec_id(node_exec_id)
node_exec = await get_node_execution(node_exec_id)
return node_exec.node_id if node_exec else node_exec_id
async def get_pending_reviews_for_user(
user_id: str, page: int = 1, page_size: int = 25
) -> list["PendingHumanReviewModel"]:
@@ -361,8 +385,7 @@ async def get_pending_reviews_for_user(
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
@@ -370,7 +393,7 @@ async def get_pending_reviews_for_user(
async def get_pending_reviews_for_execution(
graph_exec_id: str, user_id: str
) -> list["PendingHumanReviewModel"]:
) -> list[PendingHumanReviewModel]:
"""
Get all pending reviews for a specific graph execution.
@@ -396,8 +419,7 @@ async def get_pending_reviews_for_execution(
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
@@ -509,8 +531,12 @@ async def process_all_reviews_for_execution(
result = {}
for review in all_result_reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
if is_copilot_synthetic_id(review.nodeExecId):
# CoPilot synthetic node_exec_ids encode node_id as "{node_id}:{random}"
node_id = parse_node_id_from_exec_id(review.nodeExecId)
else:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
@@ -564,3 +590,21 @@ async def cancel_pending_reviews_for_execution(graph_exec_id: str, user_id: str)
},
)
return result
async def delete_review_by_node_exec_id(node_exec_id: str, user_id: str) -> int:
"""Delete a review record by node execution ID after it has been consumed.
Used by CoPilot's continue_run_block to clean up one-time-use review records
after successful execution.
Args:
node_exec_id: The node execution ID of the review to delete
user_id: User ID for authorization
Returns:
Number of records deleted
"""
return await PendingHumanReview.prisma().delete_many(
where={"nodeExecId": node_exec_id, "userId": user_id}
)

View File

@@ -144,76 +144,106 @@ async def _resolve_host(hostname: str) -> list[str]:
return ip_addresses
async def validate_url(
url: str, trusted_origins: list[str]
async def validate_url_host(
url: str, trusted_hostnames: Optional[list[str]] = None
) -> tuple[URL, bool, list[str]]:
"""
Validates the URL to prevent SSRF attacks by ensuring it does not point
to a private, link-local, or otherwise blocked IP address — unless
Validates a (URL's) host string to prevent SSRF attacks by ensuring it does not
point to a private, link-local, or otherwise blocked IP address — unless
the hostname is explicitly trusted.
Hosts in `trusted_hostnames` are permitted without checks.
All other hosts are resolved and checked against `BLOCKED_IP_NETWORKS`.
Params:
url: A hostname, netloc, or URL to validate.
If no scheme is included, `http://` is assumed.
trusted_hostnames: A list of hostnames that don't require validation.
Raises:
ValueError:
- if the URL has a disallowed URL scheme
- if the URL/host string can't be parsed
- if the hostname contains invalid or unsupported (non-ASCII) characters
- if the host resolves to a blocked IP
Returns:
str: The validated, canonicalized, parsed URL
is_trusted: Boolean indicating if the hostname is in trusted_origins
ip_addresses: List of IP addresses for the host; empty if the host is trusted
1. The validated, canonicalized, parsed host/URL,
with hostname ASCII-safe encoding
2. Whether the host is trusted (based on the passed `trusted_hostnames`).
3. List of resolved IP addresses for the host; empty if the host is trusted.
"""
parsed = parse_url(url)
# Check scheme
if parsed.scheme not in ALLOWED_SCHEMES:
raise ValueError(
f"Scheme '{parsed.scheme}' is not allowed. Only HTTP/HTTPS are supported."
f"URL scheme '{parsed.scheme}' is not allowed; allowed schemes: "
f"{', '.join(ALLOWED_SCHEMES)}"
)
# Validate and IDNA encode hostname
if not parsed.hostname:
raise ValueError("Invalid URL: No hostname found.")
raise ValueError(f"Invalid host/URL; no host in parse result: {url}")
# IDNA encode to prevent Unicode domain attacks
try:
ascii_hostname = idna.encode(parsed.hostname).decode("ascii")
except idna.IDNAError:
raise ValueError("Invalid hostname with unsupported characters.")
raise ValueError(f"Hostname '{parsed.hostname}' has unsupported characters")
# Check hostname characters
if not HOSTNAME_REGEX.match(ascii_hostname):
raise ValueError("Hostname contains invalid characters.")
raise ValueError(f"Hostname '{parsed.hostname}' has unsupported characters")
# Check if hostname is trusted
is_trusted = ascii_hostname in trusted_origins
# If not trusted, validate IP addresses
ip_addresses: list[str] = []
if not is_trusted:
# Resolve all IP addresses for the hostname
ip_addresses = await _resolve_host(ascii_hostname)
# Block any IP address that belongs to a blocked range
for ip_str in ip_addresses:
if _is_ip_blocked(ip_str):
raise ValueError(
f"Access to blocked or private IP address {ip_str} "
f"for hostname {ascii_hostname} is not allowed."
)
# Reconstruct the netloc with IDNA-encoded hostname and preserve port
netloc = ascii_hostname
if parsed.port:
netloc = f"{ascii_hostname}:{parsed.port}"
return (
URL(
parsed.scheme,
netloc,
quote(parsed.path, safe="/%:@"),
parsed.params,
parsed.query,
parsed.fragment,
),
is_trusted,
ip_addresses,
# Re-create parsed URL object with IDNA-encoded hostname
parsed = URL(
parsed.scheme,
(ascii_hostname if parsed.port is None else f"{ascii_hostname}:{parsed.port}"),
quote(parsed.path, safe="/%:@"),
parsed.params,
parsed.query,
parsed.fragment,
)
is_trusted = trusted_hostnames and any(
matches_allowed_host(parsed, allowed)
for allowed in (
# Normalize + parse allowlist entries the same way for consistent matching
parse_url(w)
for w in trusted_hostnames
)
)
if is_trusted:
return parsed, True, []
# If not allowlisted, go ahead with host resolution and IP target check
return parsed, False, await _resolve_and_check_blocked(ascii_hostname)
def matches_allowed_host(url: URL, allowed: URL) -> bool:
if url.hostname != allowed.hostname:
return False
# Allow any port if not explicitly specified in the allowlist
if allowed.port is None:
return True
return url.port == allowed.port
async def _resolve_and_check_blocked(hostname: str) -> list[str]:
"""
Resolves hostname to IPs and raises ValueError if any resolve to
a blocked network. Returns the list of resolved IP addresses.
"""
ip_addresses = await _resolve_host(hostname)
for ip_str in ip_addresses:
if _is_ip_blocked(ip_str):
raise ValueError(
f"Access to blocked or private IP address {ip_str} "
f"for hostname {hostname} is not allowed."
)
return ip_addresses
def parse_url(url: str) -> URL:
"""Canonicalizes and parses a URL string."""
@@ -352,7 +382,7 @@ class Requests:
):
self.trusted_origins = []
for url in trusted_origins or []:
hostname = urlparse(url).hostname
hostname = parse_url(url).netloc # {host}[:{port}]
if not hostname:
raise ValueError(f"Invalid URL: Unable to determine hostname of {url}")
self.trusted_origins.append(hostname)
@@ -450,7 +480,7 @@ class Requests:
data = form
# Validate URL and get trust status
parsed_url, is_trusted, ip_addresses = await validate_url(
parsed_url, is_trusted, ip_addresses = await validate_url_host(
url, self.trusted_origins
)
@@ -503,7 +533,6 @@ class Requests:
json=json,
**kwargs,
) as response:
if self.raise_for_status:
try:
response.raise_for_status()

View File

@@ -1,7 +1,7 @@
import pytest
from aiohttp import web
from backend.util.request import pin_url, validate_url
from backend.util.request import pin_url, validate_url_host
@pytest.mark.parametrize(
@@ -60,9 +60,9 @@ async def test_validate_url_no_dns_rebinding(
):
if should_raise:
with pytest.raises(ValueError):
await validate_url(raw_url, trusted_origins)
await validate_url_host(raw_url, trusted_origins)
else:
validated_url, _, _ = await validate_url(raw_url, trusted_origins)
validated_url, _, _ = await validate_url_host(raw_url, trusted_origins)
assert validated_url.geturl() == expected_value
@@ -101,10 +101,10 @@ async def test_dns_rebinding_fix(
if expect_error:
# If any IP is blocked, we expect a ValueError
with pytest.raises(ValueError):
url, _, ip_addresses = await validate_url(hostname, [])
url, _, ip_addresses = await validate_url_host(hostname)
pin_url(url, ip_addresses)
else:
url, _, ip_addresses = await validate_url(hostname, [])
url, _, ip_addresses = await validate_url_host(hostname)
pinned_url = pin_url(url, ip_addresses).geturl()
# The pinned_url should contain the first valid IP
assert pinned_url.startswith("http://") or pinned_url.startswith("https://")

View File

@@ -89,6 +89,10 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
le=500,
description="Thread pool size for FastAPI sync operations. All sync endpoints and dependencies automatically use this pool. Higher values support more concurrent sync operations but use more memory.",
)
ollama_host: str = Field(
default="localhost:11434",
description="Default Ollama host; exempted from SSRF checks.",
)
pyro_host: str = Field(
default="localhost",
description="The default hostname of the Pyro server.",

View File

@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
AutoGPT Analytics — View Generator
====================================
Reads every .sql file in analytics/queries/ and registers it as a
CREATE OR REPLACE VIEW in the analytics schema.
Quick start (from autogpt_platform/backend/):
Step 1 — one-time setup (creates schema, role, grants):
poetry run analytics-setup
Step 2 — create / refresh all 14 analytics views:
poetry run analytics-views
Both commands auto-detect credentials from .env (DB_* vars).
Use --db-url to override.
Step 3 (optional) — enable login and set a password for the read-only
role so external tools (Supabase MCP, PostHog Data Warehouse) can connect.
The role is created as NOLOGIN, so you must grant LOGIN at the same time.
Run in Supabase SQL Editor:
ALTER ROLE analytics_readonly WITH LOGIN PASSWORD 'your-password';
Usage
-----
poetry run analytics-setup # apply setup to DB
poetry run analytics-setup --dry-run # print setup SQL only
poetry run analytics-views # apply all views to DB
poetry run analytics-views --dry-run # print all view SQL only
poetry run analytics-views --only graph_execution,retention_login_weekly
Environment variables
---------------------
DATABASE_URL Postgres connection string (checked before .env)
Notes
-----
- .env DB_* vars are read automatically as a fallback.
- Safe to re-run: uses CREATE OR REPLACE VIEW.
- Looker, PostHog Data Warehouse, and Supabase MCP all read from the
same analytics.* views — no raw tables exposed.
"""
import argparse
import os
import sys
from pathlib import Path
from urllib.parse import quote
QUERIES_DIR = Path(__file__).parent.parent / "analytics" / "queries"
ENV_FILE = Path(__file__).parent / ".env"
SCHEMA = "analytics"
SETUP_SQL = """\
-- =============================================================
-- AutoGPT Analytics Schema Setup
-- Run ONCE as the postgres superuser (e.g. via Supabase SQL Editor).
-- After this, run: poetry run analytics-views
-- =============================================================
-- 1. Create the analytics schema
CREATE SCHEMA IF NOT EXISTS analytics;
-- 2. Create the read-only role (skip if already exists)
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'analytics_readonly') THEN
CREATE ROLE analytics_readonly NOLOGIN;
END IF;
END
$$;
-- 3. Analytics schema grants only.
-- Views use security_invoker = false so they execute as their
-- owner (postgres). analytics_readonly never needs direct access
-- to the platform or auth schemas.
GRANT USAGE ON SCHEMA analytics TO analytics_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO analytics_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics
GRANT SELECT ON TABLES TO analytics_readonly;
"""
def load_db_url_from_env() -> str | None:
"""Read DB_* vars from .env and build a psycopg2 connection string."""
if not ENV_FILE.exists():
return None
env: dict[str, str] = {}
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
env[key.strip()] = value.strip().strip('"').strip("'")
host = env.get("DB_HOST", "localhost")
port = env.get("DB_PORT", "5432")
user = env.get("DB_USER", "postgres")
password = env.get("DB_PASS", "")
dbname = env.get("DB_NAME", "postgres")
if not password:
return None
return (
"postgresql://"
f"{quote(user, safe='')}:{quote(password, safe='')}"
f"@{host}:{port}/{quote(dbname, safe='')}"
)
def get_db_url(args: argparse.Namespace) -> str | None:
return args.db_url or os.environ.get("DATABASE_URL") or load_db_url_from_env()
def connect(db_url: str):
try:
import psycopg2
except ImportError:
print("psycopg2 not found. Run: poetry install", file=sys.stderr)
sys.exit(1)
return psycopg2.connect(db_url)
def run_sql(db_url: str, statements: list[tuple[str, str]]) -> None:
"""Execute a list of (label, sql) pairs in a single transaction."""
conn = connect(db_url)
conn.autocommit = False
cur = conn.cursor()
try:
for label, sql in statements:
print(f" {label} ...", end=" ")
cur.execute(sql)
print("OK")
conn.commit()
print(f"\n{len(statements)} statement(s) applied.")
except Exception as e:
conn.rollback()
print(f"\n✗ Error: {e}", file=sys.stderr)
sys.exit(1)
finally:
cur.close()
conn.close()
def build_view_sql(name: str, query_body: str) -> str:
body = query_body.strip().rstrip(";")
# security_invoker = false → view runs as its owner (postgres), not the
# caller, so analytics_readonly only needs analytics schema access.
return f"CREATE OR REPLACE VIEW {SCHEMA}.{name} WITH (security_invoker = false) AS\n{body};\n"
def load_views(only: list[str] | None = None) -> list[tuple[str, str]]:
"""Return [(label, sql)] for all views, in alphabetical order."""
files = sorted(QUERIES_DIR.glob("*.sql"))
if not files:
print(f"No .sql files found in {QUERIES_DIR}", file=sys.stderr)
sys.exit(1)
known = {f.stem for f in files}
if only:
unknown = [n for n in only if n not in known]
if unknown:
print(
f"Unknown view name(s): {', '.join(unknown)}\n"
f"Available: {', '.join(sorted(known))}",
file=sys.stderr,
)
sys.exit(1)
result = []
for f in files:
name = f.stem
if only and name not in only:
continue
result.append((f"view analytics.{name}", build_view_sql(name, f.read_text())))
return result
def no_db_url_error() -> None:
print(
"No database URL found.\n"
"Tried: --db-url, DATABASE_URL env var, and .env (DB_* vars).\n"
"Use --dry-run to just print the SQL.",
file=sys.stderr,
)
sys.exit(1)
def cmd_setup(args: argparse.Namespace) -> None:
if args.dry_run:
print(SETUP_SQL)
return
db_url = get_db_url(args)
if not db_url:
no_db_url_error()
assert db_url
print("Applying analytics setup...")
run_sql(db_url, [("schema / role / grants", SETUP_SQL)])
def cmd_views(args: argparse.Namespace) -> None:
only = [v.strip() for v in args.only.split(",")] if args.only else None
views = load_views(only=only)
if not views:
print("No matching views found.")
sys.exit(0)
if args.dry_run:
print(f"-- {len(views)} views\n")
for label, sql in views:
print(f"-- {label}")
print(sql)
return
db_url = get_db_url(args)
if not db_url:
no_db_url_error()
assert db_url
print(f"Applying {len(views)} view(s)...")
# Append grant refresh so the readonly role sees any new views
grant = f"GRANT SELECT ON ALL TABLES IN SCHEMA {SCHEMA} TO analytics_readonly;"
run_sql(db_url, views + [("grant analytics_readonly", grant)])
def main_setup() -> None:
parser = argparse.ArgumentParser(description="Apply analytics schema setup to DB")
parser.add_argument(
"--dry-run", action="store_true", help="Print SQL, don't execute"
)
parser.add_argument("--db-url", help="Postgres connection string")
cmd_setup(parser.parse_args())
def main_views() -> None:
parser = argparse.ArgumentParser(description="Apply analytics views to DB")
parser.add_argument(
"--dry-run", action="store_true", help="Print SQL, don't execute"
)
parser.add_argument("--db-url", help="Postgres connection string")
parser.add_argument("--only", help="Comma-separated view names to update")
cmd_views(parser.parse_args())
if __name__ == "__main__":
# Default: apply views (backwards-compatible with direct python invocation)
main_views()

View File

@@ -0,0 +1,7 @@
-- Remove GraphExecution foreign key from PendingHumanReview
-- The graphExecId column remains for querying, but we remove the FK constraint
-- to AgentGraphExecution since PendingHumanReview records can now be created
-- with synthetic graph_exec_ids (e.g., CoPilot direct block execution uses
-- "copilot-session-{session_id}" as graph_exec_id).
ALTER TABLE "PendingHumanReview" DROP CONSTRAINT IF EXISTS "PendingHumanReview_graphExecId_fkey";

View File

@@ -120,6 +120,8 @@ ws = "backend.ws:main"
scheduler = "backend.scheduler:main"
notification = "backend.notification:main"
executor = "backend.exec:main"
analytics-setup = "generate_views:main_setup"
analytics-views = "generate_views:main_views"
copilot-executor = "backend.copilot.executor.__main__:main"
cli = "backend.cli:main"
format = "linter:format"

View File

@@ -566,8 +566,6 @@ model AgentGraphExecution {
shareToken String? @unique
sharedAt DateTime?
PendingHumanReviews PendingHumanReview[]
@@index([agentGraphId, agentGraphVersion])
@@index([userId, isDeleted, createdAt])
@@index([createdAt])
@@ -664,8 +662,7 @@ model PendingHumanReview {
updatedAt DateTime? @updatedAt
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
@@unique([nodeExecId]) // One pending review per node execution
@@index([userId, status])

View File

@@ -1,36 +0,0 @@
"use server";
import BackendAPI from "@/lib/autogpt-server-api/client";
import { OttoQuery, OttoResponse } from "@/lib/autogpt-server-api/types";
const api = new BackendAPI();
export async function askOtto(
query: string,
conversationHistory: { query: string; response: string }[],
includeGraphData: boolean,
graphId?: string,
): Promise<OttoResponse> {
const messageId = `${Date.now()}-web`;
const ottoQuery: OttoQuery = {
query,
conversation_history: conversationHistory,
message_id: messageId,
include_graph_data: includeGraphData,
graph_id: graphId,
};
try {
const response = await api.askOtto(ottoQuery);
return response;
} catch (error) {
console.error("Error in askOtto server action:", error);
return {
answer: error instanceof Error ? error.message : "Unknown error occurred",
documents: [],
success: false,
error: true,
};
}
}

View File

@@ -23,6 +23,12 @@ import { WebhookDisclaimer } from "./components/WebhookDisclaimer";
import { SubAgentUpdateFeature } from "./components/SubAgentUpdate/SubAgentUpdateFeature";
import { useCustomNode } from "./useCustomNode";
function hasAdvancedFields(schema: RJSFSchema): boolean {
const properties = schema?.properties;
if (!properties) return false;
return Object.values(properties).some((prop: any) => prop.advanced === true);
}
export type CustomNodeData = {
hardcodedValues: {
[key: string]: any;
@@ -108,7 +114,11 @@ export const CustomNode: React.FC<NodeProps<CustomNode>> = React.memo(
)}
showHandles={showHandles}
/>
<NodeAdvancedToggle nodeId={nodeId} />
<NodeAdvancedToggle
nodeId={nodeId}
isLastSection={data.uiType === BlockUIType.OUTPUT}
hasAdvancedFields={hasAdvancedFields(inputSchema)}
/>
{data.uiType != BlockUIType.OUTPUT && (
<OutputHandler
uiType={data.uiType}

View File

@@ -2,18 +2,33 @@ import { useNodeStore } from "@/app/(platform)/build/stores/nodeStore";
import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { CaretDownIcon } from "@phosphor-icons/react";
import { cn } from "@/lib/utils";
type Props = {
nodeId: string;
isLastSection?: boolean;
hasAdvancedFields?: boolean;
};
export function NodeAdvancedToggle({ nodeId }: Props) {
export function NodeAdvancedToggle({
nodeId,
isLastSection,
hasAdvancedFields = true,
}: Props) {
const showAdvanced = useNodeStore(
(state) => state.nodeAdvancedStates[nodeId] || false,
);
const setShowAdvanced = useNodeStore((state) => state.setShowAdvanced);
if (!hasAdvancedFields) return null;
return (
<div className="flex items-center justify-start gap-2 bg-white px-5 pb-3.5">
<div
className={cn(
"flex items-center justify-start gap-2 bg-white px-5 pb-3.5",
isLastSection && "rounded-b-xlarge",
)}
>
<Button
variant="ghost"
className="h-fit min-w-0 p-0 hover:border-transparent hover:bg-transparent"

View File

@@ -1,6 +1,6 @@
import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { CaretDownIcon, InfoIcon } from "@phosphor-icons/react";
import { CaretDownIcon, CaretRightIcon, InfoIcon } from "@phosphor-icons/react";
import { RJSFSchema } from "@rjsf/utils";
import { useState } from "react";
@@ -30,13 +30,41 @@ export const OutputHandler = ({
const properties = outputSchema?.properties || {};
const [isOutputVisible, setIsOutputVisible] = useState(true);
const brokenOutputs = useBrokenOutputs(nodeId);
const [expandedObjects, setExpandedObjects] = useState<
Record<string, boolean>
>({});
const showHandles = uiType !== BlockUIType.OUTPUT;
function toggleObjectExpanded(key: string) {
setExpandedObjects((prev) => ({ ...prev, [key]: !prev[key] }));
}
function hasConnectedOrBrokenDescendant(
schema: RJSFSchema,
keyPrefix: string,
): boolean {
if (!schema) return false;
return Object.entries(schema).some(
([key, fieldSchema]: [string, RJSFSchema]) => {
const fullKey = keyPrefix ? `${keyPrefix}_#_${key}` : key;
if (isOutputConnected(nodeId, fullKey) || brokenOutputs.has(fullKey))
return true;
if (fieldSchema?.properties)
return hasConnectedOrBrokenDescendant(
fieldSchema.properties,
fullKey,
);
return false;
},
);
}
const renderOutputHandles = (
schema: RJSFSchema,
keyPrefix: string = "",
titlePrefix: string = "",
connectedOnly: boolean = false,
): React.ReactNode[] => {
return Object.entries(schema).map(
([key, fieldSchema]: [string, RJSFSchema]) => {
@@ -44,10 +72,23 @@ export const OutputHandler = ({
const fieldTitle = titlePrefix + (fieldSchema?.title || key);
const isConnected = isOutputConnected(nodeId, fullKey);
const shouldShow = isConnected || isOutputVisible;
const isBroken = brokenOutputs.has(fullKey);
const hasNestedProperties = !!fieldSchema?.properties;
const selfIsRelevant = isConnected || isBroken;
const descendantIsRelevant =
hasNestedProperties &&
hasConnectedOrBrokenDescendant(fieldSchema.properties!, fullKey);
const shouldShow = connectedOnly
? selfIsRelevant || descendantIsRelevant
: isOutputVisible || selfIsRelevant || descendantIsRelevant;
const { displayType, colorClass, hexColor } =
getTypeDisplayInfo(fieldSchema);
const isBroken = brokenOutputs.has(fullKey);
const isExpanded = expandedObjects[fullKey] ?? false;
// User expanded → show all children; auto-expanded → filter to connected only
const shouldRenderChildren = isExpanded || descendantIsRelevant;
return shouldShow ? (
<div
@@ -56,6 +97,19 @@ export const OutputHandler = ({
data-tutorial-id={`output-handler-${nodeId}-${fieldTitle}`}
>
<div className="relative flex items-center gap-2">
{hasNestedProperties && (
<button
onClick={() => toggleObjectExpanded(fullKey)}
className="flex items-center text-slate-500 hover:text-slate-700"
aria-label={isExpanded ? "Collapse" : "Expand"}
>
{isExpanded ? (
<CaretDownIcon size={12} weight="bold" />
) : (
<CaretRightIcon size={12} weight="bold" />
)}
</button>
)}
{fieldSchema?.description && (
<TooltipProvider>
<Tooltip>
@@ -102,12 +156,14 @@ export const OutputHandler = ({
)}
</div>
{/* Recursively render nested properties */}
{fieldSchema?.properties &&
{/* Nested properties */}
{hasNestedProperties &&
shouldRenderChildren &&
renderOutputHandles(
fieldSchema.properties,
fieldSchema.properties!,
fullKey,
`${fieldTitle}.`,
"",
!isExpanded,
)}
</div>
) : null;
@@ -136,7 +192,7 @@ export const OutputHandler = ({
</Button>
<div className="flex flex-col items-end gap-2">
{renderOutputHandles(properties)}
{renderOutputHandles(properties, "", "", !isOutputVisible)}
</div>
</div>
);

View File

@@ -25,34 +25,60 @@ type HistoryStore = {
const MAX_HISTORY = 50;
// Microtask batching state — kept outside the store to avoid triggering
// re-renders. When multiple pushState calls happen in the same synchronous
// execution (e.g. node deletion cascading to edge cleanup), only the first
// (pre-change) state is kept and committed as a single history entry.
let pendingState: HistoryState | null = null;
let batchScheduled = false;
export const useHistoryStore = create<HistoryStore>((set, get) => ({
past: [{ nodes: [], edges: [] }],
future: [],
pushState: (state: HistoryState) => {
const { past } = get();
const lastState = past[past.length - 1];
if (lastState && isEqual(lastState, state)) {
return;
// Keep only the first state within a microtask batch — it represents
// the true pre-change snapshot before any cascading mutations.
if (!pendingState) {
pendingState = state;
}
const actualCurrentState = {
nodes: useNodeStore.getState().nodes,
edges: useEdgeStore.getState().edges,
};
if (!batchScheduled) {
batchScheduled = true;
queueMicrotask(() => {
const stateToCommit = pendingState;
pendingState = null;
batchScheduled = false;
if (isEqual(state, actualCurrentState)) {
return;
if (!stateToCommit) return;
const { past } = get();
const lastState = past[past.length - 1];
if (lastState && isEqual(lastState, stateToCommit)) {
return;
}
const actualCurrentState = {
nodes: useNodeStore.getState().nodes,
edges: useEdgeStore.getState().edges,
};
if (isEqual(stateToCommit, actualCurrentState)) {
return;
}
set((prev) => ({
past: [...prev.past.slice(-MAX_HISTORY + 1), stateToCommit],
future: [],
}));
});
}
set((prev) => ({
past: [...prev.past.slice(-MAX_HISTORY + 1), state],
future: [],
}));
},
initializeHistory: () => {
pendingState = null;
const currentNodes = useNodeStore.getState().nodes;
const currentEdges = useEdgeStore.getState().edges;
@@ -122,5 +148,8 @@ export const useHistoryStore = create<HistoryStore>((set, get) => ({
},
canRedo: () => get().future.length > 0,
clear: () => set({ past: [{ nodes: [], edges: [] }], future: [] }),
clear: () => {
pendingState = null;
set({ past: [{ nodes: [], edges: [] }], future: [] });
},
}));

View File

@@ -1,3 +1,4 @@
import { useMemo } from "react";
import {
Conversation,
ConversationContent,
@@ -8,6 +9,7 @@ import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner
import { FileUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { TOOL_PART_PREFIX } from "../JobStatsBar/constants";
import { TurnStatsBar } from "../JobStatsBar/TurnStatsBar";
import { CopilotPendingReviews } from "../CopilotPendingReviews/CopilotPendingReviews";
import {
buildRenderSegments,
getTurnMessages,
@@ -51,6 +53,50 @@ function renderSegments(
});
}
/**
* Extract graph_exec_id from tool outputs that need review.
* Handles both:
* - run_block ReviewRequiredResponse (has graph_exec_id directly)
* - run_agent ExecutionStartedResponse with status "REVIEW" (has execution_id)
*/
function extractGraphExecId(
messages: UIMessage<unknown, UIDataTypes, UITools>[],
): string | null {
// Scan backwards — the most recent review output has the ID
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
for (const part of msg.parts) {
if ("output" in part && part.output) {
const out =
typeof part.output === "string"
? (() => {
try {
return JSON.parse(part.output);
} catch {
return null;
}
})()
: part.output;
if (out && typeof out === "object") {
// run_block: ReviewRequiredResponse has graph_exec_id
if ("graph_exec_id" in out) {
return (out as { graph_exec_id: string }).graph_exec_id;
}
// run_agent: ExecutionStartedResponse with status "REVIEW"
if (
"execution_id" in out &&
"status" in out &&
(out as { status: string }).status === "REVIEW"
) {
return (out as { execution_id: string }).execution_id;
}
}
}
}
}
return null;
}
export function ChatMessagesContainer({
messages,
status,
@@ -60,6 +106,7 @@ export function ChatMessagesContainer({
sessionID,
}: Props) {
const lastMessage = messages[messages.length - 1];
const graphExecId = useMemo(() => extractGraphExecId(messages), [messages]);
const hasInflight = (() => {
if (lastMessage?.role !== "assistant") return false;
@@ -205,6 +252,7 @@ export function ChatMessagesContainer({
</MessageContent>
</Message>
)}
{graphExecId && <CopilotPendingReviews graphExecId={graphExecId} />}
{error && (
<details className="rounded-lg bg-red-50 p-4 text-sm text-red-700">
<summary className="cursor-pointer font-medium">

View File

@@ -130,6 +130,7 @@ export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
case "tool-get_doc_page":
return <SearchDocsTool key={key} part={part as ToolUIPart} />;
case "tool-run_block":
case "tool-continue_run_block":
return <RunBlockTool key={key} part={part as ToolUIPart} />;
case "tool-run_mcp_tool":
return <RunMCPToolComponent key={key} part={part as ToolUIPart} />;

View File

@@ -19,6 +19,7 @@ const CUSTOM_TOOL_TYPES = new Set([
"tool-search_docs",
"tool-get_doc_page",
"tool-run_block",
"tool-continue_run_block",
"tool-run_mcp_tool",
"tool-run_agent",
"tool-schedule_agent",
@@ -33,6 +34,7 @@ const INTERACTIVE_RESPONSE_TYPES: ReadonlySet<string> = new Set([
ResponseType.setup_requirements,
ResponseType.agent_details,
ResponseType.block_details,
ResponseType.review_required,
ResponseType.need_login,
ResponseType.input_validation_error,
ResponseType.agent_builder_clarification_needed,

View File

@@ -0,0 +1,61 @@
"use client";
import { useCallback } from "react";
import { PendingReviewsList } from "@/components/organisms/PendingReviewsList/PendingReviewsList";
import { useCopilotChatActions } from "../CopilotChatActionsProvider/useCopilotChatActions";
import { usePendingReviewsForExecution } from "@/hooks/usePendingReviews";
import { okData } from "@/app/api/helpers";
interface Props {
graphExecId: string;
}
/**
* Renders a single consolidated PendingReviewsList for all pending copilot
* reviews in a session — mirrors the non-copilot review page behavior.
* Works for both run_block (synthetic copilot-session-*) and run_agent (real graph exec) reviews.
*/
export function CopilotPendingReviews({ graphExecId }: Props) {
const { onSend } = useCopilotChatActions();
const { pendingReviews, refetch } = usePendingReviewsForExecution(
graphExecId,
{ enabled: !!graphExecId, refetchInterval: 2000 },
);
// Graph executions auto-resume after approval; block reviews need continue_run_block.
const isGraphExecution = !graphExecId.startsWith("copilot-session-");
const handleReviewComplete = useCallback(async () => {
// Brief delay for the server to propagate the approval
await new Promise((resolve) => setTimeout(resolve, 500));
const result = await refetch();
const remaining = okData(result.data) || [];
if (remaining.length > 0) return;
if (isGraphExecution) {
onSend(
`All pending reviews have been processed. ` +
`The agent execution will resume automatically for approved reviews. ` +
`Use view_agent_output with execution_id="${graphExecId}" to check the result.`,
);
} else {
onSend(
`All pending reviews have been processed. ` +
`For any approved reviews, call continue_run_block with the corresponding review_id to execute them. ` +
`For rejected reviews, no further action is needed.`,
);
}
}, [refetch, onSend, isGraphExecution, graphExecId]);
if (pendingReviews.length === 0) return null;
return (
<div className="py-2">
<PendingReviewsList
reviews={pendingReviews}
onReviewComplete={handleReviewComplete}
/>
</div>
);
}

View File

@@ -15,6 +15,7 @@ import {
isRunBlockBlockOutput,
isRunBlockDetailsOutput,
isRunBlockErrorOutput,
isRunBlockReviewRequiredOutput,
isRunBlockSetupRequirementsOutput,
ToolIcon,
} from "./helpers";
@@ -54,10 +55,15 @@ export function RunBlockTool({ part }: Props) {
part.state === "output-available" &&
!!output &&
!setupRequirementsOutput &&
!isRunBlockReviewRequiredOutput(output) &&
(isRunBlockBlockOutput(output) ||
isRunBlockDetailsOutput(output) ||
isRunBlockErrorOutput(output));
// Review UI is rendered at the chat level by CopilotPendingReviews,
// not inside each tool card. This matches the non-copilot flow where
// a single PendingReviewsList shows all reviews grouped together.
return (
<div className="py-2">
<div className="flex items-center gap-2 text-sm text-muted-foreground">

View File

@@ -26,6 +26,18 @@ export interface BlockDetailsResponse {
user_authenticated: boolean;
}
/** Response when a block requires human review before execution. */
export interface ReviewRequiredResponse {
type: typeof ResponseType.review_required;
message: string;
session_id?: string | null;
block_id: string;
block_name: string;
review_id: string;
graph_exec_id: string;
input_data: Record<string, unknown>;
}
export interface RunBlockInput {
block_id?: string;
block_name?: string;
@@ -36,12 +48,14 @@ export type RunBlockToolOutput =
| SetupRequirementsResponse
| BlockDetailsResponse
| BlockOutputResponse
| ReviewRequiredResponse
| ErrorResponse;
const RUN_BLOCK_OUTPUT_TYPES = new Set<string>([
ResponseType.setup_requirements,
ResponseType.block_details,
ResponseType.block_output,
ResponseType.review_required,
ResponseType.error,
]);
@@ -66,7 +80,19 @@ export function isRunBlockDetailsOutput(
export function isRunBlockBlockOutput(
output: RunBlockToolOutput,
): output is BlockOutputResponse {
return output.type === ResponseType.block_output || "block_id" in output;
return (
output.type === ResponseType.block_output ||
("block_id" in output && !("review_id" in output))
);
}
export function isRunBlockReviewRequiredOutput(
output: RunBlockToolOutput,
): output is ReviewRequiredResponse {
return (
output.type === ResponseType.review_required ||
("review_id" in output && "block_name" in output && "input_data" in output)
);
}
export function isRunBlockErrorOutput(
@@ -91,6 +117,7 @@ function parseOutput(output: unknown): RunBlockToolOutput | null {
if (typeof type === "string" && RUN_BLOCK_OUTPUT_TYPES.has(type)) {
return output as RunBlockToolOutput;
}
if ("review_id" in output) return output as ReviewRequiredResponse;
if ("block_id" in output) return output as BlockOutputResponse;
if ("block" in output) return output as BlockDetailsResponse;
if ("setup_info" in output) return output as SetupRequirementsResponse;
@@ -135,6 +162,9 @@ export function getAnimationText(part: {
if (isRunBlockSetupRequirementsOutput(output)) {
return `Setup needed for "${output.setup_info.agent_name}"`;
}
if (isRunBlockReviewRequiredOutput(output)) {
return `Review needed for "${output.block_name}"`;
}
return "Error running block";
}
case "output-error":
@@ -227,6 +257,14 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
};
}
if (isRunBlockReviewRequiredOutput(output)) {
return {
icon,
title: output.block_name,
description: "Sensitive action — awaiting review",
};
}
return {
icon: (
<WarningDiamondIcon size={32} weight="light" className="text-red-500" />

View File

@@ -9,52 +9,52 @@
/**
* Types of tool responses.
*/
export type ResponseType = typeof ResponseType[keyof typeof ResponseType];
export type ResponseType = (typeof ResponseType)[keyof typeof ResponseType];
// eslint-disable-next-line @typescript-eslint/no-redeclare
export const ResponseType = {
error: 'error',
no_results: 'no_results',
need_login: 'need_login',
agents_found: 'agents_found',
agent_details: 'agent_details',
setup_requirements: 'setup_requirements',
input_validation_error: 'input_validation_error',
execution_started: 'execution_started',
agent_output: 'agent_output',
understanding_updated: 'understanding_updated',
suggested_goal: 'suggested_goal',
agent_builder_guide: 'agent_builder_guide',
agent_builder_preview: 'agent_builder_preview',
agent_builder_saved: 'agent_builder_saved',
agent_builder_clarification_needed: 'agent_builder_clarification_needed',
agent_builder_validation_result: 'agent_builder_validation_result',
agent_builder_fix_result: 'agent_builder_fix_result',
block_list: 'block_list',
block_details: 'block_details',
block_output: 'block_output',
mcp_guide: 'mcp_guide',
mcp_tools_discovered: 'mcp_tools_discovered',
mcp_tool_output: 'mcp_tool_output',
doc_search_results: 'doc_search_results',
doc_page: 'doc_page',
workspace_file_list: 'workspace_file_list',
workspace_file_content: 'workspace_file_content',
workspace_file_metadata: 'workspace_file_metadata',
workspace_file_written: 'workspace_file_written',
workspace_file_deleted: 'workspace_file_deleted',
folder_created: 'folder_created',
folder_list: 'folder_list',
folder_updated: 'folder_updated',
folder_moved: 'folder_moved',
folder_deleted: 'folder_deleted',
agents_moved_to_folder: 'agents_moved_to_folder',
browser_navigate: 'browser_navigate',
browser_act: 'browser_act',
browser_screenshot: 'browser_screenshot',
bash_exec: 'bash_exec',
web_fetch: 'web_fetch',
feature_request_search: 'feature_request_search',
feature_request_created: 'feature_request_created',
error: "error",
no_results: "no_results",
need_login: "need_login",
agents_found: "agents_found",
agent_details: "agent_details",
setup_requirements: "setup_requirements",
input_validation_error: "input_validation_error",
execution_started: "execution_started",
agent_output: "agent_output",
understanding_updated: "understanding_updated",
suggested_goal: "suggested_goal",
agent_builder_guide: "agent_builder_guide",
agent_builder_preview: "agent_builder_preview",
agent_builder_saved: "agent_builder_saved",
agent_builder_clarification_needed: "agent_builder_clarification_needed",
agent_builder_validation_result: "agent_builder_validation_result",
agent_builder_fix_result: "agent_builder_fix_result",
block_list: "block_list",
block_details: "block_details",
block_output: "block_output",
review_required: "review_required",
mcp_guide: "mcp_guide",
mcp_tools_discovered: "mcp_tools_discovered",
mcp_tool_output: "mcp_tool_output",
doc_search_results: "doc_search_results",
doc_page: "doc_page",
workspace_file_list: "workspace_file_list",
workspace_file_content: "workspace_file_content",
workspace_file_metadata: "workspace_file_metadata",
workspace_file_written: "workspace_file_written",
workspace_file_deleted: "workspace_file_deleted",
folder_created: "folder_created",
folder_list: "folder_list",
folder_updated: "folder_updated",
folder_moved: "folder_moved",
folder_deleted: "folder_deleted",
agents_moved_to_folder: "agents_moved_to_folder",
browser_navigate: "browser_navigate",
browser_act: "browser_act",
browser_screenshot: "browser_screenshot",
bash_exec: "bash_exec",
web_fetch: "web_fetch",
feature_request_search: "feature_request_search",
feature_request_created: "feature_request_created",
} as const;

View File

@@ -11557,6 +11557,7 @@
"block_list",
"block_details",
"block_output",
"review_required",
"mcp_guide",
"mcp_tools_discovered",
"mcp_tool_output",

View File

@@ -57,27 +57,25 @@ async function handleWorkspaceDownload(
);
}
// Get the content type from the backend response
// Fully buffer the response before forwarding. Passing response.body as a
// ReadableStream causes silent truncation in Next.js / Vercel — the last
// ~10 KB of larger files are dropped, corrupting PNGs and truncating CSVs.
const buffer = await response.arrayBuffer();
const contentType =
response.headers.get("Content-Type") || "application/octet-stream";
const contentDisposition = response.headers.get("Content-Disposition");
// Stream the response body
const responseHeaders: Record<string, string> = {
"Content-Type": contentType,
"Content-Length": String(buffer.byteLength),
};
if (contentDisposition) {
responseHeaders["Content-Disposition"] = contentDisposition;
}
const contentLength = response.headers.get("Content-Length");
if (contentLength) {
responseHeaders["Content-Length"] = contentLength;
}
// Stream the response body directly instead of buffering in memory
return new NextResponse(response.body, {
return new NextResponse(buffer, {
status: 200,
headers: responseHeaders,
});

View File

@@ -34,10 +34,7 @@ export function FormRenderer({
}, [preprocessedSchema, uiSchema]);
return (
<div
className={cn("mb-6 mt-4", className)}
data-tutorial-id="input-handles"
>
<div className={cn("mt-4", className)} data-tutorial-id="input-handles">
<Form
formContext={formContext}
idPrefix="agpt"

View File

@@ -63,7 +63,6 @@ export const useAnyOfField = (props: FieldProps) => {
);
const handlePrefix = cleanUpHandleId(field_id);
console.log("handlePrefix", handlePrefix);
useEdgeStore
.getState()
.removeEdgesByHandlePrefix(registry.formContext.nodeId, handlePrefix);

View File

@@ -4,6 +4,7 @@ import {
TemplatesType,
} from "@rjsf/utils";
import { AnyOfField } from "./anyof/AnyOfField";
import { OneOfField } from "./oneof/OneOfField";
import {
ArrayFieldItemTemplate,
ArrayFieldTemplate,
@@ -32,6 +33,7 @@ const NoButton = () => null;
export function generateBaseFields(): RegistryFieldsType {
return {
AnyOfField,
OneOfField,
ArraySchemaField,
};
}

View File

@@ -0,0 +1,243 @@
import {
descriptionId,
FieldProps,
getTemplate,
getUiOptions,
getWidget,
} from "@rjsf/utils";
import { useEffect, useRef, useState } from "react";
import { AnyOfField } from "../anyof/AnyOfField";
import { cleanUpHandleId, getHandleId, updateUiOption } from "../../helpers";
import { useEdgeStore } from "@/app/(platform)/build/stores/edgeStore";
import { ANY_OF_FLAG } from "../../constants";
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
function getDiscriminatorPropName(schema: any): string | undefined {
if (!schema?.discriminator) return undefined;
if (typeof schema.discriminator === "string") return schema.discriminator;
return schema.discriminator.propertyName;
}
export function OneOfField(props: FieldProps) {
const { schema } = props;
const discriminatorProp = getDiscriminatorPropName(schema);
if (!discriminatorProp) {
return <AnyOfField {...props} />;
}
return (
<DiscriminatedUnionField {...props} discriminatorProp={discriminatorProp} />
);
}
interface DiscriminatedUnionFieldProps extends FieldProps {
discriminatorProp: string;
}
function DiscriminatedUnionField({
discriminatorProp,
...props
}: DiscriminatedUnionFieldProps) {
const { schema, registry, formData, onChange, name } = props;
const { fields, schemaUtils, formContext } = registry;
const { SchemaField } = fields;
const { nodeId } = formContext;
const field_id = props.fieldPathId.$id;
// Resolve variant schemas from $refs
const variants = useRef(
(schema.oneOf || []).map((opt: any) =>
schemaUtils.retrieveSchema(opt, formData),
),
);
// Build dropdown options from variant titles and discriminator const values
const enumOptions = variants.current.map((variant: any, index: number) => {
const discValue = (variant.properties?.[discriminatorProp] as any)?.const;
return {
value: index,
label: variant.title || discValue || `Option ${index + 1}`,
discriminatorValue: discValue,
};
});
// Determine initial selected index from formData
function getInitialIndex() {
const currentDisc = formData?.[discriminatorProp];
if (currentDisc) {
const idx = enumOptions.findIndex(
(o) => o.discriminatorValue === currentDisc,
);
if (idx >= 0) return idx;
}
return 0;
}
const [selectedIndex, setSelectedIndex] = useState(getInitialIndex);
// Generate handleId for sub-fields (same convention as AnyOfField)
const uiOptions = getUiOptions(props.uiSchema, props.globalUiOptions);
const handleId = getHandleId({
uiOptions,
id: field_id + ANY_OF_FLAG,
schema,
});
const childUiSchema = updateUiOption(props.uiSchema, {
handleId,
label: false,
fromAnyOf: true,
});
// Get selected variant schema with discriminator property filtered out
// and sub-fields inheriting the parent's advanced value
const selectedVariant = variants.current[selectedIndex];
const parentAdvanced = (schema as any).advanced;
function getFilteredSchema() {
if (!selectedVariant?.properties) return selectedVariant;
const filteredProperties: Record<string, any> = {};
for (const [key, value] of Object.entries(selectedVariant.properties)) {
if (key === discriminatorProp) continue;
filteredProperties[key] =
parentAdvanced !== undefined
? { ...(value as any), advanced: parentAdvanced }
: value;
}
return {
...selectedVariant,
properties: filteredProperties,
required: (selectedVariant.required || []).filter(
(r: string) => r !== discriminatorProp,
),
};
}
const filteredSchema = getFilteredSchema();
// Handle variant change
function handleVariantChange(option?: string) {
const newIndex = option !== undefined ? parseInt(option, 10) : -1;
if (newIndex === selectedIndex || newIndex < 0) return;
const newVariant = variants.current[newIndex];
const oldVariant = variants.current[selectedIndex];
const discValue = (newVariant.properties?.[discriminatorProp] as any)
?.const;
// Clean edges for this field
const handlePrefix = cleanUpHandleId(field_id);
useEdgeStore.getState().removeEdgesByHandlePrefix(nodeId, handlePrefix);
// Sanitize current data against old→new schema to preserve shared fields
let newFormData = schemaUtils.sanitizeDataForNewSchema(
newVariant,
oldVariant,
formData,
);
// Fill in defaults for the new variant
newFormData = schemaUtils.getDefaultFormState(
newVariant,
newFormData,
"excludeObjectChildren",
) as any;
newFormData = { ...newFormData, [discriminatorProp]: discValue };
setSelectedIndex(newIndex);
onChange(newFormData, props.fieldPathId.path, undefined, field_id);
}
// Sync selectedIndex when formData discriminator changes externally
// (e.g. undo/redo, loading saved state)
const currentDiscValue = formData?.[discriminatorProp];
useEffect(() => {
const idx = currentDiscValue
? enumOptions.findIndex((o) => o.discriminatorValue === currentDiscValue)
: -1;
if (idx >= 0) {
if (idx !== selectedIndex) setSelectedIndex(idx);
} else if (enumOptions.length > 0 && selectedIndex !== 0) {
// Unknown or cleared discriminator — full reset via same cleanup path
handleVariantChange("0");
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [currentDiscValue]);
// Auto-set discriminator on initial render if missing
useEffect(() => {
const discValue = enumOptions[selectedIndex]?.discriminatorValue;
if (discValue && formData?.[discriminatorProp] !== discValue) {
onChange(
{ ...formData, [discriminatorProp]: discValue },
props.fieldPathId.path,
undefined,
field_id,
);
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const Widget = getWidget({ type: "string" }, "select", registry.widgets);
const selector = (
<Widget
id={field_id}
name={`${name}__oneof_select`}
schema={{ type: "number", default: 0 }}
onChange={handleVariantChange}
onBlur={props.onBlur}
onFocus={props.onFocus}
disabled={props.disabled || enumOptions.length === 0}
multiple={false}
value={selectedIndex}
options={{ enumOptions }}
registry={registry}
placeholder={props.placeholder}
autocomplete={props.autocomplete}
className={cn("-ml-1 h-[22px] w-fit gap-1 px-1 pl-2 text-xs font-medium")}
autofocus={props.autofocus}
label=""
hideLabel={true}
readonly={props.readonly}
/>
);
const DescriptionFieldTemplate = getTemplate(
"DescriptionFieldTemplate",
registry,
uiOptions,
);
const description_id = descriptionId(props.fieldPathId ?? "");
return (
<div>
<div className="flex items-center gap-2">
<Text variant="body" className="line-clamp-1">
{schema.title || name}
</Text>
<Text variant="small" className="mr-1 text-red-500">
{props.required ? "*" : null}
</Text>
{selector}
<DescriptionFieldTemplate
id={description_id}
description={schema.description || ""}
schema={schema}
registry={registry}
/>
</div>
{filteredSchema && filteredSchema.type !== "null" && (
<SchemaField
{...props}
schema={filteredSchema}
uiSchema={childUiSchema}
/>
)}
</div>
);
}

View File

@@ -6,7 +6,11 @@ import {
titleId,
} from "@rjsf/utils";
import { isAnyOfChild, isAnyOfSchema } from "../../utils/schema-utils";
import {
isAnyOfChild,
isAnyOfSchema,
isOneOfSchema,
} from "../../utils/schema-utils";
import {
cleanUpHandleId,
getHandleId,
@@ -82,12 +86,13 @@ export default function FieldTemplate(props: FieldTemplateProps) {
const shouldDisplayLabel =
displayLabel ||
(schema.type === "boolean" && !isAnyOfChild(uiSchema as any));
const shouldShowTitleSection = !isAnyOfSchema(schema) && !additional;
const isUnionSchema = isAnyOfSchema(schema) || isOneOfSchema(schema);
const shouldShowTitleSection = !isUnionSchema && !additional;
const shouldShowChildren =
schema.type === "object" ||
schema.type === "array" ||
isAnyOfSchema(schema) ||
isUnionSchema ||
!isHandleConnected;
const isAdvancedField = (schema as any).advanced === true;
@@ -95,8 +100,7 @@ export default function FieldTemplate(props: FieldTemplateProps) {
return null;
}
const marginBottom =
isPartOfAnyOf({ uiOptions }) || isAnyOfSchema(schema) ? 0 : 16;
const marginBottom = isPartOfAnyOf({ uiOptions }) || isUnionSchema ? 0 : 16;
return (
<WrapIfAdditionalTemplate

View File

@@ -7,7 +7,7 @@ import {
import { Text } from "@/components/atoms/Text/Text";
import { getTypeDisplayInfo } from "@/app/(platform)/build/components/FlowEditor/nodes/helpers";
import { isAnyOfSchema } from "../../utils/schema-utils";
import { isAnyOfSchema, isOneOfSchema } from "../../utils/schema-utils";
import { cn } from "@/lib/utils";
import { cleanUpHandleId, isArrayItem } from "../../helpers";
import { InputNodeHandle } from "@/app/(platform)/build/components/FlowEditor/handlers/NodeHandle";
@@ -18,7 +18,7 @@ export default function TitleField(props: TitleFieldProps) {
const { nodeId, showHandles } = registry.formContext;
const uiOptions = getUiOptions(uiSchema);
const isAnyOf = isAnyOfSchema(schema);
const isAnyOf = isAnyOfSchema(schema) || isOneOfSchema(schema);
const { displayType, colorClass } = getTypeDisplayInfo(schema);
const description_id = descriptionId(id);

View File

@@ -8,6 +8,14 @@ export function isAnyOfSchema(schema: RJSFSchema | undefined): boolean {
);
}
export function isOneOfSchema(schema: RJSFSchema | undefined): boolean {
return (
Array.isArray(schema?.oneOf) &&
schema!.oneOf.length > 0 &&
schema?.enum === undefined
);
}
export const isAnyOfChild = (
uiSchema: UiSchema<any, RJSFSchema, any> | undefined,
): boolean => {