Compare commits

..

14 Commits

Author SHA1 Message Date
Otto (AGPT)
e9afd9fa01 fix: cast OnboardingStep enum to text in funnel view
The completedSteps column is a platform."OnboardingStep" enum array.
UNNEST produces enum values that can't be compared directly to text
from the VALUES clause. Adding ::text cast fixes the type mismatch.
2026-03-13 11:55:37 +00:00
Zamil Majdy
ddb4f6e9de fix(analytics): address second batch of PR review comments
- user_onboarding_funnel: build complete 22-step grid with VALUES CTE
  so zero-completion steps are always present, fixing LAG comparisons
  against wrong predecessors; update docs to reflect all 22 steps
- users_activities: use COUNT(DISTINCT "id") for agent_count to avoid
  counting multiple version rows per graph; add COALESCE(..., 0) for
  agent_count, unique_agent_runs, agent_runs; update docs column list
  to include node_execution_incomplete and node_execution_review
- generate_views: update Step 3 comment to clarify NOLOGIN role needs
  WITH LOGIN PASSWORD not just WITH PASSWORD; add fail-fast validation
  for unknown --only view names with helpful error message
2026-03-12 00:47:55 +07:00
Zamil Majdy
f585d97928 fix(analytics): move new status columns to end of users_activities SELECT
CREATE OR REPLACE VIEW requires existing columns to stay in position.
Moving node_execution_incomplete and node_execution_review after
is_active_after_7d so the replacement doesn't shift existing columns.
2026-03-12 00:01:40 +07:00
Zamil Majdy
7d39234fdd fix(analytics): address PR review comments
- user_block_spending: use ->> instead of -> for JSONB field extraction
  before casting to int (avoids runtime cast errors)
- generate_views: create analytics_readonly as NOLOGIN to avoid a
  usable role with a known default password
- generate_views: percent-encode DB credentials in the URI builder so
  passwords with reserved chars (@, :, /) connect correctly
- graph_execution: remove WHERE filter on sensitive_action_safe_mode
  before DISTINCT ON so the latest LibraryAgent version always wins
  (fixes possibly_ai being sticky once any version had the flag set)
- retention_agent: use DISTINCT ON ordered by version DESC instead of
  MAX(name) so renamed agents resolve to their latest name
- retention_login_daily: add 90-day cohort_start filter to first_login
  CTE so the view matches its documented window
- user_onboarding_funnel: map the 8 missing OnboardingStep enum values
  (VISIT_COPILOT, RE_RUN_AGENT, SCHEDULE_AGENT, RUN_AGENTS, RUN_3_DAYS,
  TRIGGER_WEBHOOK, RUN_14_DAYS, RUN_AGENTS_100) to step_order 15-22
- users_activities: use updatedAt instead of createdAt for
  last_agent_save_time; add node_execution_incomplete and
  node_execution_review status columns
2026-03-11 23:48:42 +07:00
Zamil Majdy
6e9d4c4333 perf(analytics): fix fan-out in users_activities view
The original CTEs drove all joins from user_logins, causing a
O(users × executions × node_executions) fan-out that made the view
too heavy for Supabase to serve. Rewrote each CTE to aggregate its
own source table directly by userId, then LEFT JOIN the aggregates
in the final SELECT.
2026-03-11 23:39:14 +07:00
Zamil Majdy
8aad333a45 refactor(analytics): move generate_views.py to backend, add poetry run analytics-setup/analytics-views scripts 2026-03-11 16:23:29 +07:00
Zamil Majdy
856f0d980d fix(analytics): restrict analytics_readonly to analytics schema only via security_invoker=false views 2026-03-11 16:16:03 +07:00
Zamil Majdy
3c3aadd361 docs(analytics): add step-by-step quick start to generate_views.py docstring 2026-03-11 16:12:22 +07:00
Zamil Majdy
e87a693fdd feat(analytics): auto-load DB creds from backend/.env as fallback 2026-03-11 16:10:31 +07:00
Zamil Majdy
fe265c10d4 refactor(analytics): generate setup.sql via --setup flag, gitignore it 2026-03-11 16:01:52 +07:00
Zamil Majdy
5d00a94693 chore(analytics): remove auto-generated files, gitignore views.sql 2026-03-11 16:00:48 +07:00
Zamil Majdy
6e1605994d feat(analytics): add documented SQL views with generation script
Introduces an analytics/ layer that wraps production Postgres data in
safe, read-only views exposed under the analytics schema.

- 14 documented query files in queries/ (one per Looker data source)
  covering auth activities, user activity, execution metrics, onboarding
  funnel, and cohort retention (login + execution, weekly + daily)
- setup.sql — one-time schema creation and role/grant setup for the
  analytics_readonly role (auth, platform, analytics schemas)
- generate_views.py — reads queries/*.sql and applies
  CREATE OR REPLACE VIEW analytics.<name> to the database;
  supports --dry-run, --only, and --db-url flags
- views.sql — pre-generated combined reference output
- README.md — full setup, deployment, and integration guide

Looker, PostHog Data Warehouse, and Supabase MCP (for Otto) all
connect to the same analytics.* views instead of raw tables.
2026-03-11 15:36:27 +07:00
Zamil Majdy
15e3980d65 fix(frontend): buffer workspace file downloads to prevent truncation (#12349)
## Summary
- Workspace file downloads (images, CSVs, etc.) were silently truncated
(~10 KB lost from the end) when served through the Next.js proxy
- Root cause: `new NextResponse(response.body)` passes a
`ReadableStream` directly, which Next.js / Vercel silently truncates for
larger files
- Fix: fully buffer with `response.arrayBuffer()` before forwarding, and
set `Content-Length` from the actual buffer size
- Keeps the auth proxy intact — no signed URLs (which would be public
and expire, breaking chat history)

## Root cause verification
Confirmed locally on session `080f27f9-0379-4085-a67a-ee34cc40cd62`:
- Backend `write_workspace_file` logs **978,831 bytes** written
- Direct backend download (`curl
localhost:8006/api/workspace/files/.../download`): **978,831 bytes** 
- Browser download through Next.js proxy: **truncated** 

## Why not signed URLs?
- Signed URLs are effectively public — anyone with the link can download
the file (privacy concern)
- Signed URLs expire, but chat history persists — reopening a
conversation later would show broken downloads
- Buffering is fine: workspace files are capped at 100 MB, Vercel
function memory is 1 GB

## Related
- Discord thread: `#Truncated File Bug` channel
- Related PR #12319 (signed URL approach) — this fix is simpler and
preserves auth

## Test plan
- [ ] Download a workspace file (CSV, PNG, any type) through the copilot
UI
- [ ] Verify downloaded file size matches the original
- [ ] Verify PNGs open correctly and CSVs have all rows

cc @Swiftyos @uberdot @AdarshRawat1
2026-03-10 18:23:51 +00:00
Zamil Majdy
fe9eb2564b feat(copilot): HITL review for sensitive block execution (#12356)
## Summary
- Integrates existing Human-In-The-Loop (HITL) review infrastructure
into CoPilot's direct block execution (`run_block`) for blocks marked
with `is_sensitive_action=True`
- Removes the `PendingHumanReview → AgentGraphExecution` FK constraint
to support synthetic CoPilot session IDs (migration included)
- Adds `ReviewRequiredResponse` model + frontend `ReviewRequiredCard`
component to surface review status in the chat UI
- Auto-approval works within a CoPilot session: once a block is
approved, subsequent executions of the same block in the same session
are auto-approved (using `copilot-session-{session_id}` as
`graph_exec_id` and `copilot-node-{block_id}` as `node_id`)

## Test plan
- [x] All 11 `run_block_test.py` tests pass (3 new sensitive action
tests)
- [ ] Manual: Execute a block with `is_sensitive_action=True` in CoPilot
→ verify ReviewRequiredResponse is returned and rendered
- [ ] Manual: Approve in review panel → re-execute the same block →
verify auto-approval kicks in
- [ ] Manual: Verify non-sensitive blocks still execute without review
2026-03-10 18:20:11 +00:00
50 changed files with 2749 additions and 973 deletions

View File

@@ -0,0 +1,40 @@
-- =============================================================
-- View: analytics.auth_activities
-- Looker source alias: ds49 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Tracks authentication events (login, logout, SSO, password
-- reset, etc.) from Supabase's internal audit log.
-- Useful for monitoring sign-in patterns and detecting anomalies.
--
-- SOURCE TABLES
-- auth.audit_log_entries — Supabase internal auth event log
--
-- OUTPUT COLUMNS
-- created_at TIMESTAMPTZ When the auth event occurred
-- actor_id TEXT User ID who triggered the event
-- actor_via_sso TEXT Whether the action was via SSO ('true'/'false')
-- action TEXT Event type (e.g. 'login', 'logout', 'token_refreshed')
--
-- WINDOW
-- Rolling 90 days from current date
--
-- EXAMPLE QUERIES
-- -- Daily login counts
-- SELECT DATE_TRUNC('day', created_at) AS day, COUNT(*) AS logins
-- FROM analytics.auth_activities
-- WHERE action = 'login'
-- GROUP BY 1 ORDER BY 1;
--
-- -- SSO vs password login breakdown
-- SELECT actor_via_sso, COUNT(*) FROM analytics.auth_activities
-- WHERE action = 'login' GROUP BY 1;
-- =============================================================
SELECT
created_at,
payload->>'actor_id' AS actor_id,
payload->>'actor_via_sso' AS actor_via_sso,
payload->>'action' AS action
FROM auth.audit_log_entries
WHERE created_at >= NOW() - INTERVAL '90 days'

View File

@@ -0,0 +1,105 @@
-- =============================================================
-- View: analytics.graph_execution
-- Looker source alias: ds16 | Charts: 21
-- =============================================================
-- DESCRIPTION
-- One row per agent graph execution (last 90 days).
-- Unpacks the JSONB stats column into individual numeric columns
-- and normalises the executionStatus — runs that failed due to
-- insufficient credits are reclassified as 'NO_CREDITS' for
-- easier filtering. Error messages are scrubbed of IDs and URLs
-- to allow safe grouping.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
-- platform.AgentGraph — Agent graph metadata (for name)
-- platform.LibraryAgent — To flag possibly-AI (safe-mode) agents
--
-- OUTPUT COLUMNS
-- id TEXT Execution UUID
-- agentGraphId TEXT Agent graph UUID
-- agentGraphVersion INT Graph version number
-- executionStatus TEXT COMPLETED | FAILED | NO_CREDITS | RUNNING | QUEUED | TERMINATED
-- createdAt TIMESTAMPTZ When the execution was queued
-- updatedAt TIMESTAMPTZ Last status update time
-- userId TEXT Owner user UUID
-- agentGraphName TEXT Human-readable agent name
-- cputime DECIMAL Total CPU seconds consumed
-- walltime DECIMAL Total wall-clock seconds
-- node_count DECIMAL Number of nodes in the graph
-- nodes_cputime DECIMAL CPU time across all nodes
-- nodes_walltime DECIMAL Wall time across all nodes
-- execution_cost DECIMAL Credit cost of this execution
-- correctness_score FLOAT AI correctness score (if available)
-- possibly_ai BOOLEAN True if agent has sensitive_action_safe_mode enabled
-- groupedErrorMessage TEXT Scrubbed error string (IDs/URLs replaced with wildcards)
--
-- WINDOW
-- Rolling 90 days (createdAt > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Daily execution counts by status
-- SELECT DATE_TRUNC('day', "createdAt") AS day, "executionStatus", COUNT(*)
-- FROM analytics.graph_execution
-- GROUP BY 1, 2 ORDER BY 1;
--
-- -- Average cost per execution by agent
-- SELECT "agentGraphName", AVG("execution_cost") AS avg_cost, COUNT(*) AS runs
-- FROM analytics.graph_execution
-- WHERE "executionStatus" = 'COMPLETED'
-- GROUP BY 1 ORDER BY avg_cost DESC;
--
-- -- Top error messages
-- SELECT "groupedErrorMessage", COUNT(*) AS occurrences
-- FROM analytics.graph_execution
-- WHERE "executionStatus" = 'FAILED'
-- GROUP BY 1 ORDER BY 2 DESC LIMIT 20;
-- =============================================================
SELECT
ge."id" AS id,
ge."agentGraphId" AS agentGraphId,
ge."agentGraphVersion" AS agentGraphVersion,
CASE
WHEN jsonb_exists(ge."stats"::jsonb, 'error')
AND (
(ge."stats"::jsonb->>'error') ILIKE '%insufficient balance%'
OR (ge."stats"::jsonb->>'error') ILIKE '%you have no credits left%'
)
THEN 'NO_CREDITS'
ELSE CAST(ge."executionStatus" AS TEXT)
END AS executionStatus,
ge."createdAt" AS createdAt,
ge."updatedAt" AS updatedAt,
ge."userId" AS userId,
g."name" AS agentGraphName,
(ge."stats"::jsonb->>'cputime')::decimal AS cputime,
(ge."stats"::jsonb->>'walltime')::decimal AS walltime,
(ge."stats"::jsonb->>'node_count')::decimal AS node_count,
(ge."stats"::jsonb->>'nodes_cputime')::decimal AS nodes_cputime,
(ge."stats"::jsonb->>'nodes_walltime')::decimal AS nodes_walltime,
(ge."stats"::jsonb->>'cost')::decimal AS execution_cost,
(ge."stats"::jsonb->>'correctness_score')::float AS correctness_score,
COALESCE(la.possibly_ai, FALSE) AS possibly_ai,
REGEXP_REPLACE(
REGEXP_REPLACE(
TRIM(BOTH '"' FROM ge."stats"::jsonb->>'error'),
'(https?://)([A-Za-z0-9.-]+)(:[0-9]+)?(/[^\s]*)?',
'\1\2/...', 'gi'
),
'[a-zA-Z0-9_:-]*\d[a-zA-Z0-9_:-]*', '*', 'g'
) AS groupedErrorMessage
FROM platform."AgentGraphExecution" ge
LEFT JOIN platform."AgentGraph" g
ON ge."agentGraphId" = g."id"
AND ge."agentGraphVersion" = g."version"
LEFT JOIN (
SELECT DISTINCT ON ("userId", "agentGraphId")
"userId", "agentGraphId",
("settings"::jsonb->>'sensitive_action_safe_mode')::boolean AS possibly_ai
FROM platform."LibraryAgent"
WHERE "isDeleted" = FALSE
AND "isArchived" = FALSE
ORDER BY "userId", "agentGraphId", "agentGraphVersion" DESC
) la ON la."userId" = ge."userId" AND la."agentGraphId" = ge."agentGraphId"
WHERE ge."createdAt" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -0,0 +1,101 @@
-- =============================================================
-- View: analytics.node_block_execution
-- Looker source alias: ds14 | Charts: 11
-- =============================================================
-- DESCRIPTION
-- One row per node (block) execution (last 90 days).
-- Unpacks stats JSONB and joins to identify which block type
-- was run. For failed nodes, joins the error output and
-- scrubs it for safe grouping.
--
-- SOURCE TABLES
-- platform.AgentNodeExecution — Node execution records
-- platform.AgentNode — Node → block mapping
-- platform.AgentBlock — Block name/ID
-- platform.AgentNodeExecutionInputOutput — Error output values
--
-- OUTPUT COLUMNS
-- id TEXT Node execution UUID
-- agentGraphExecutionId TEXT Parent graph execution UUID
-- agentNodeId TEXT Node UUID within the graph
-- executionStatus TEXT COMPLETED | FAILED | QUEUED | RUNNING | TERMINATED
-- addedTime TIMESTAMPTZ When the node was queued
-- queuedTime TIMESTAMPTZ When it entered the queue
-- startedTime TIMESTAMPTZ When execution started
-- endedTime TIMESTAMPTZ When execution finished
-- inputSize BIGINT Input payload size in bytes
-- outputSize BIGINT Output payload size in bytes
-- walltime NUMERIC Wall-clock seconds for this node
-- cputime NUMERIC CPU seconds for this node
-- llmRetryCount INT Number of LLM retries
-- llmCallCount INT Number of LLM API calls made
-- inputTokenCount BIGINT LLM input tokens consumed
-- outputTokenCount BIGINT LLM output tokens produced
-- blockName TEXT Human-readable block name (e.g. 'OpenAIBlock')
-- blockId TEXT Block UUID
-- groupedErrorMessage TEXT Scrubbed error (IDs/URLs wildcarded)
-- errorMessage TEXT Raw error output (only set when FAILED)
--
-- WINDOW
-- Rolling 90 days (addedTime > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Most-used blocks by execution count
-- SELECT "blockName", COUNT(*) AS executions,
-- COUNT(*) FILTER (WHERE "executionStatus"='FAILED') AS failures
-- FROM analytics.node_block_execution
-- GROUP BY 1 ORDER BY executions DESC LIMIT 20;
--
-- -- Average LLM token usage per block
-- SELECT "blockName",
-- AVG("inputTokenCount") AS avg_input_tokens,
-- AVG("outputTokenCount") AS avg_output_tokens
-- FROM analytics.node_block_execution
-- WHERE "llmCallCount" > 0
-- GROUP BY 1 ORDER BY avg_input_tokens DESC;
--
-- -- Top failure reasons
-- SELECT "blockName", "groupedErrorMessage", COUNT(*) AS count
-- FROM analytics.node_block_execution
-- WHERE "executionStatus" = 'FAILED'
-- GROUP BY 1, 2 ORDER BY count DESC LIMIT 20;
-- =============================================================
SELECT
ne."id" AS id,
ne."agentGraphExecutionId" AS agentGraphExecutionId,
ne."agentNodeId" AS agentNodeId,
CAST(ne."executionStatus" AS TEXT) AS executionStatus,
ne."addedTime" AS addedTime,
ne."queuedTime" AS queuedTime,
ne."startedTime" AS startedTime,
ne."endedTime" AS endedTime,
(ne."stats"::jsonb->>'input_size')::bigint AS inputSize,
(ne."stats"::jsonb->>'output_size')::bigint AS outputSize,
(ne."stats"::jsonb->>'walltime')::numeric AS walltime,
(ne."stats"::jsonb->>'cputime')::numeric AS cputime,
(ne."stats"::jsonb->>'llm_retry_count')::int AS llmRetryCount,
(ne."stats"::jsonb->>'llm_call_count')::int AS llmCallCount,
(ne."stats"::jsonb->>'input_token_count')::bigint AS inputTokenCount,
(ne."stats"::jsonb->>'output_token_count')::bigint AS outputTokenCount,
b."name" AS blockName,
b."id" AS blockId,
REGEXP_REPLACE(
REGEXP_REPLACE(
TRIM(BOTH '"' FROM eio."data"::text),
'(https?://)([A-Za-z0-9.-]+)(:[0-9]+)?(/[^\s]*)?',
'\1\2/...', 'gi'
),
'[a-zA-Z0-9_:-]*\d[a-zA-Z0-9_:-]*', '*', 'g'
) AS groupedErrorMessage,
eio."data" AS errorMessage
FROM platform."AgentNodeExecution" ne
LEFT JOIN platform."AgentNode" nd
ON ne."agentNodeId" = nd."id"
LEFT JOIN platform."AgentBlock" b
ON nd."agentBlockId" = b."id"
LEFT JOIN platform."AgentNodeExecutionInputOutput" eio
ON eio."referencedByOutputExecId" = ne."id"
AND eio."name" = 'error'
AND ne."executionStatus" = 'FAILED'
WHERE ne."addedTime" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -0,0 +1,97 @@
-- =============================================================
-- View: analytics.retention_agent
-- Looker source alias: ds35 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention broken down per individual agent.
-- Cohort = week of a user's first use of THAT specific agent.
-- Tells you which agents keep users coming back vs. one-shot
-- use. Only includes cohorts from the last 180 days.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records (user × agent × time)
-- platform.AgentGraph — Agent names
--
-- OUTPUT COLUMNS
-- agent_id TEXT Agent graph UUID
-- agent_label TEXT 'AgentName [first8chars]'
-- agent_label_n TEXT 'AgentName [first8chars] (n=total_users)'
-- cohort_week_start DATE Week users first ran this agent
-- cohort_label TEXT ISO week label
-- cohort_label_n TEXT ISO week label with cohort size
-- user_lifetime_week INT Weeks since first use of this agent
-- cohort_users BIGINT Users in this cohort for this agent
-- active_users BIGINT Users who ran the agent again in week k
-- retention_rate FLOAT active_users / cohort_users
-- cohort_users_w0 BIGINT cohort_users only at week 0 (safe to SUM)
-- agent_total_users BIGINT Total users across all cohorts for this agent
--
-- EXAMPLE QUERIES
-- -- Best-retained agents at week 2
-- SELECT agent_label, AVG(retention_rate) AS w2_retention
-- FROM analytics.retention_agent
-- WHERE user_lifetime_week = 2 AND cohort_users >= 10
-- GROUP BY 1 ORDER BY w2_retention DESC LIMIT 10;
--
-- -- Agents with most unique users
-- SELECT DISTINCT agent_label, agent_total_users
-- FROM analytics.retention_agent
-- ORDER BY agent_total_users DESC LIMIT 20;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, (CURRENT_DATE - INTERVAL '180 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."agentGraphId" AS agent_id,
e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('week', e."createdAt")::date AS week_start
FROM platform."AgentGraphExecution" e
),
first_use AS (
SELECT user_id, agent_id, MIN(created_at) AS first_use_at,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1,2
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_weeks AS (SELECT DISTINCT user_id, agent_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, aw.agent_id, fu.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fu.first_use_at)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_use fu USING (user_id, agent_id)
WHERE aw.week_start >= DATE_TRUNC('week',fu.first_use_at)::date
),
active_counts AS (
SELECT agent_id, cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2,3
),
cohort_sizes AS (
SELECT agent_id, cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_use GROUP BY 1,2
),
cohort_caps AS (
SELECT cs.agent_id, cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.agent_id, cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
),
agent_names AS (SELECT DISTINCT ON (g."id") g."id" AS agent_id, g."name" AS agent_name FROM platform."AgentGraph" g ORDER BY g."id", g."version" DESC),
agent_total_users AS (SELECT agent_id, SUM(cohort_users) AS agent_total_users FROM cohort_sizes GROUP BY 1)
SELECT
g.agent_id,
COALESCE(an.agent_name,'(unnamed)')||' ['||LEFT(g.agent_id::text,8)||']' AS agent_label,
COALESCE(an.agent_name,'(unnamed)')||' ['||LEFT(g.agent_id::text,8)||'] (n='||COALESCE(atu.agent_total_users,0)||')' AS agent_label_n,
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(ac.active_users,0) AS active_users,
COALESCE(ac.active_users,0)::float / NULLIF(g.cohort_users,0) AS retention_rate,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0,
COALESCE(atu.agent_total_users,0) AS agent_total_users
FROM grid g
LEFT JOIN active_counts ac ON ac.agent_id=g.agent_id AND ac.cohort_week_start=g.cohort_week_start AND ac.user_lifetime_week=g.user_lifetime_week
LEFT JOIN agent_names an ON an.agent_id=g.agent_id
LEFT JOIN agent_total_users atu ON atu.agent_id=g.agent_id
ORDER BY agent_label, g.cohort_week_start, g.user_lifetime_week;

View File

@@ -0,0 +1,81 @@
-- =============================================================
-- View: analytics.retention_execution_daily
-- Looker source alias: ds111 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Daily cohort retention based on agent executions.
-- Cohort anchor = day of user's FIRST ever execution.
-- Only includes cohorts from the last 90 days, up to day 30.
-- Great for early engagement analysis (did users run another
-- agent the next day?).
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
--
-- OUTPUT COLUMNS
-- Same pattern as retention_login_daily.
-- cohort_day_start = day of first execution (not first login)
--
-- EXAMPLE QUERIES
-- -- Day-3 execution retention
-- SELECT cohort_label, retention_rate_bounded AS d3_retention
-- FROM analytics.retention_execution_daily
-- WHERE user_lifetime_day = 3 ORDER BY cohort_day_start;
-- =============================================================
WITH params AS (SELECT 30::int AS max_days, (CURRENT_DATE - INTERVAL '90 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('day', e."createdAt")::date AS day_start
FROM platform."AgentGraphExecution" e WHERE e."userId" IS NOT NULL
),
first_exec AS (
SELECT user_id, MIN(created_at) AS first_exec_at,
DATE_TRUNC('day', MIN(created_at))::date AS cohort_day_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_days AS (SELECT DISTINCT user_id, day_start FROM events),
user_day_age AS (
SELECT ad.user_id, fe.cohort_day_start,
(ad.day_start - DATE_TRUNC('day',fe.first_exec_at)::date)::int AS user_lifetime_day
FROM activity_days ad JOIN first_exec fe USING (user_id)
WHERE ad.day_start >= DATE_TRUNC('day',fe.first_exec_at)::date
),
bounded_counts AS (
SELECT cohort_day_start, user_lifetime_day, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_day_age WHERE user_lifetime_day >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_day_start, user_id, MAX(user_lifetime_day) AS last_active_day FROM user_day_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_day_start, gs AS user_lifetime_day, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_day,(SELECT max_days FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_day_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_exec GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_day_start, cs.cohort_users,
LEAST((SELECT max_days FROM params), GREATEST(0,(CURRENT_DATE-cs.cohort_day_start)::int)) AS cap_days
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_day_start, gs AS user_lifetime_day, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_days) gs
)
SELECT
g.cohort_day_start,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD') AS cohort_label,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_day, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_day=0 THEN g.cohort_users ELSE 0 END AS cohort_users_d0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_day_start=g.cohort_day_start AND b.user_lifetime_day=g.user_lifetime_day
LEFT JOIN unbounded_counts u ON u.cohort_day_start=g.cohort_day_start AND u.user_lifetime_day=g.user_lifetime_day
ORDER BY g.cohort_day_start, g.user_lifetime_day;

View File

@@ -0,0 +1,81 @@
-- =============================================================
-- View: analytics.retention_execution_weekly
-- Looker source alias: ds92 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention based on agent executions.
-- Cohort anchor = week of user's FIRST ever agent execution
-- (not first login). Only includes cohorts from the last 180 days.
-- Useful when you care about product engagement, not just visits.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
--
-- OUTPUT COLUMNS
-- Same pattern as retention_login_weekly.
-- cohort_week_start = week of first execution (not first login)
--
-- EXAMPLE QUERIES
-- -- Week-2 execution retention
-- SELECT cohort_label, retention_rate_bounded
-- FROM analytics.retention_execution_weekly
-- WHERE user_lifetime_week = 2 ORDER BY cohort_week_start;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, (CURRENT_DATE - INTERVAL '180 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('week', e."createdAt")::date AS week_start
FROM platform."AgentGraphExecution" e WHERE e."userId" IS NOT NULL
),
first_exec AS (
SELECT user_id, MIN(created_at) AS first_exec_at,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fe.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fe.first_exec_at)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_exec fe USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week',fe.first_exec_at)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_exec GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week;

View File

@@ -0,0 +1,94 @@
-- =============================================================
-- View: analytics.retention_login_daily
-- Looker source alias: ds112 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Daily cohort retention based on login sessions.
-- Same logic as retention_login_weekly but at day granularity,
-- showing up to day 30 for cohorts from the last 90 days.
-- Useful for analysing early activation (days 1-7) in detail.
--
-- SOURCE TABLES
-- auth.sessions — Login session records
--
-- OUTPUT COLUMNS (same pattern as retention_login_weekly)
-- cohort_day_start DATE First day the cohort logged in
-- cohort_label TEXT Date string (e.g. '2025-03-01')
-- cohort_label_n TEXT Date + cohort size (e.g. '2025-03-01 (n=12)')
-- user_lifetime_day INT Days since first login (0 = signup day)
-- cohort_users BIGINT Total users in cohort
-- active_users_bounded BIGINT Users active on exactly day k
-- retained_users_unbounded BIGINT Users active any time on/after day k
-- retention_rate_bounded FLOAT bounded / cohort_users
-- retention_rate_unbounded FLOAT unbounded / cohort_users
-- cohort_users_d0 BIGINT cohort_users only at day 0, else 0 (safe to SUM)
--
-- EXAMPLE QUERIES
-- -- Day-1 retention rate (came back next day)
-- SELECT cohort_label, retention_rate_bounded AS d1_retention
-- FROM analytics.retention_login_daily
-- WHERE user_lifetime_day = 1 ORDER BY cohort_day_start;
--
-- -- Average retention curve across all cohorts
-- SELECT user_lifetime_day,
-- SUM(active_users_bounded)::float / NULLIF(SUM(cohort_users_d0), 0) AS avg_retention
-- FROM analytics.retention_login_daily
-- GROUP BY 1 ORDER BY 1;
-- =============================================================
WITH params AS (SELECT 30::int AS max_days, (CURRENT_DATE - INTERVAL '90 days')::date AS cohort_start),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('day', s.created_at)::date AS day_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('day', MIN(created_at))::date AS cohort_day_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_days AS (SELECT DISTINCT user_id, day_start FROM events),
user_day_age AS (
SELECT ad.user_id, fl.cohort_day_start,
(ad.day_start - DATE_TRUNC('day', fl.first_login_time)::date)::int AS user_lifetime_day
FROM activity_days ad JOIN first_login fl USING (user_id)
WHERE ad.day_start >= DATE_TRUNC('day', fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_day_start, user_lifetime_day, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_day_age WHERE user_lifetime_day >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_day_start, user_id, MAX(user_lifetime_day) AS last_active_day FROM user_day_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_day_start, gs AS user_lifetime_day, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_day,(SELECT max_days FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_day_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_day_start, cs.cohort_users,
LEAST((SELECT max_days FROM params), GREATEST(0,(CURRENT_DATE-cs.cohort_day_start)::int)) AS cap_days
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_day_start, gs AS user_lifetime_day, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_days) gs
)
SELECT
g.cohort_day_start,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD') AS cohort_label,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_day, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_day=0 THEN g.cohort_users ELSE 0 END AS cohort_users_d0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_day_start=g.cohort_day_start AND b.user_lifetime_day=g.user_lifetime_day
LEFT JOIN unbounded_counts u ON u.cohort_day_start=g.cohort_day_start AND u.user_lifetime_day=g.user_lifetime_day
ORDER BY g.cohort_day_start, g.user_lifetime_day;

View File

@@ -0,0 +1,96 @@
-- =============================================================
-- View: analytics.retention_login_onboarded_weekly
-- Looker source alias: ds101 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention from login sessions, restricted to
-- users who "onboarded" — defined as running at least one
-- agent within 365 days of their first login.
-- Filters out users who signed up but never activated,
-- giving a cleaner view of engaged-user retention.
--
-- SOURCE TABLES
-- auth.sessions — Login session records
-- platform.AgentGraphExecution — Used to identify onboarders
--
-- OUTPUT COLUMNS
-- Same as retention_login_weekly (cohort_week_start, user_lifetime_week,
-- retention_rate_bounded, retention_rate_unbounded, etc.)
-- Only difference: cohort is filtered to onboarded users only.
--
-- EXAMPLE QUERIES
-- -- Compare week-4 retention: all users vs onboarded only
-- SELECT 'all_users' AS segment, AVG(retention_rate_bounded) AS w4_retention
-- FROM analytics.retention_login_weekly WHERE user_lifetime_week = 4
-- UNION ALL
-- SELECT 'onboarded', AVG(retention_rate_bounded)
-- FROM analytics.retention_login_onboarded_weekly WHERE user_lifetime_week = 4;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, 365::int AS onboarding_window_days),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('week', s.created_at)::date AS week_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login_all AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
),
onboarders AS (
SELECT fl.user_id FROM first_login_all fl
WHERE EXISTS (
SELECT 1 FROM platform."AgentGraphExecution" e
WHERE e."userId"::text = fl.user_id
AND e."createdAt" >= fl.first_login_time
AND e."createdAt" < fl.first_login_time
+ make_interval(days => (SELECT onboarding_window_days FROM params))
)
),
first_login AS (SELECT * FROM first_login_all WHERE user_id IN (SELECT user_id FROM onboarders)),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fl.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fl.first_login_time)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_login fl USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week',fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week;

View File

@@ -0,0 +1,103 @@
-- =============================================================
-- View: analytics.retention_login_weekly
-- Looker source alias: ds83 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention based on login sessions.
-- Users are grouped by the ISO week of their first ever login.
-- For each cohort × lifetime-week combination, outputs both:
-- - bounded rate: % active in exactly that week
-- - unbounded rate: % who were ever active on or after that week
-- Weeks are capped to the cohort's actual age (no future data points).
--
-- SOURCE TABLES
-- auth.sessions — Login session records
--
-- HOW TO READ THE OUTPUT
-- cohort_week_start The Monday of the week users first logged in
-- user_lifetime_week 0 = signup week, 1 = one week later, etc.
-- retention_rate_bounded = active_users_bounded / cohort_users
-- retention_rate_unbounded = retained_users_unbounded / cohort_users
--
-- OUTPUT COLUMNS
-- cohort_week_start DATE First day of the cohort's signup week
-- cohort_label TEXT ISO week label (e.g. '2025-W01')
-- cohort_label_n TEXT ISO week label with cohort size (e.g. '2025-W01 (n=42)')
-- user_lifetime_week INT Weeks since first login (0 = signup week)
-- cohort_users BIGINT Total users in this cohort (denominator)
-- active_users_bounded BIGINT Users active in exactly week k
-- retained_users_unbounded BIGINT Users active any time on/after week k
-- retention_rate_bounded FLOAT bounded active / cohort_users
-- retention_rate_unbounded FLOAT unbounded retained / cohort_users
-- cohort_users_w0 BIGINT cohort_users only at week 0, else 0 (safe to SUM in pivot tables)
--
-- EXAMPLE QUERIES
-- -- Week-1 retention rate per cohort
-- SELECT cohort_label, retention_rate_bounded AS w1_retention
-- FROM analytics.retention_login_weekly
-- WHERE user_lifetime_week = 1
-- ORDER BY cohort_week_start;
--
-- -- Overall average retention curve (all cohorts combined)
-- SELECT user_lifetime_week,
-- SUM(active_users_bounded)::float / NULLIF(SUM(cohort_users_w0), 0) AS avg_retention
-- FROM analytics.retention_login_weekly
-- GROUP BY 1 ORDER BY 1;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('week', s.created_at)::date AS week_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fl.cohort_week_start,
((aw.week_start - DATE_TRUNC('week', fl.first_login_time)::date) / 7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_login fl USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week', fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date - cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week

View File

@@ -0,0 +1,71 @@
-- =============================================================
-- View: analytics.user_block_spending
-- Looker source alias: ds6 | Charts: 5
-- =============================================================
-- DESCRIPTION
-- One row per credit transaction (last 90 days).
-- Shows how users spend credits broken down by block type,
-- LLM provider and model. Joins node execution stats for
-- token-level detail.
--
-- SOURCE TABLES
-- platform.CreditTransaction — Credit debit/credit records
-- platform.AgentNodeExecution — Node execution stats (for token counts)
--
-- OUTPUT COLUMNS
-- transactionKey TEXT Unique transaction identifier
-- userId TEXT User who was charged
-- amount DECIMAL Credit amount (positive = credit, negative = debit)
-- negativeAmount DECIMAL amount * -1 (convenience for spend charts)
-- transactionType TEXT Transaction type (e.g. 'USAGE', 'REFUND', 'TOP_UP')
-- transactionTime TIMESTAMPTZ When the transaction was recorded
-- blockId TEXT Block UUID that triggered the spend
-- blockName TEXT Human-readable block name
-- llm_provider TEXT LLM provider (e.g. 'openai', 'anthropic')
-- llm_model TEXT Model name (e.g. 'gpt-4o', 'claude-3-5-sonnet')
-- node_exec_id TEXT Linked node execution UUID
-- llm_call_count INT LLM API calls made in that execution
-- llm_retry_count INT LLM retries in that execution
-- llm_input_token_count INT Input tokens consumed
-- llm_output_token_count INT Output tokens produced
--
-- WINDOW
-- Rolling 90 days (createdAt > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Total spend per user (last 90 days)
-- SELECT "userId", SUM("negativeAmount") AS total_spent
-- FROM analytics.user_block_spending
-- WHERE "transactionType" = 'USAGE'
-- GROUP BY 1 ORDER BY total_spent DESC;
--
-- -- Spend by LLM provider + model
-- SELECT "llm_provider", "llm_model",
-- SUM("negativeAmount") AS total_cost,
-- SUM("llm_input_token_count") AS input_tokens,
-- SUM("llm_output_token_count") AS output_tokens
-- FROM analytics.user_block_spending
-- WHERE "llm_provider" IS NOT NULL
-- GROUP BY 1, 2 ORDER BY total_cost DESC;
-- =============================================================
SELECT
c."transactionKey" AS transactionKey,
c."userId" AS userId,
c."amount" AS amount,
c."amount" * -1 AS negativeAmount,
c."type" AS transactionType,
c."createdAt" AS transactionTime,
c.metadata->>'block_id' AS blockId,
c.metadata->>'block' AS blockName,
c.metadata->'input'->'credentials'->>'provider' AS llm_provider,
c.metadata->'input'->>'model' AS llm_model,
c.metadata->>'node_exec_id' AS node_exec_id,
(ne."stats"->>'llm_call_count')::int AS llm_call_count,
(ne."stats"->>'llm_retry_count')::int AS llm_retry_count,
(ne."stats"->>'input_token_count')::int AS llm_input_token_count,
(ne."stats"->>'output_token_count')::int AS llm_output_token_count
FROM platform."CreditTransaction" c
LEFT JOIN platform."AgentNodeExecution" ne
ON (c.metadata->>'node_exec_id') = ne."id"::text
WHERE c."createdAt" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -0,0 +1,45 @@
-- =============================================================
-- View: analytics.user_onboarding
-- Looker source alias: ds68 | Charts: 3
-- =============================================================
-- DESCRIPTION
-- One row per user onboarding record. Contains the user's
-- stated usage reason, selected integrations, completed
-- onboarding steps and optional first agent selection.
-- Full history (no date filter) since onboarding happens
-- once per user.
--
-- SOURCE TABLES
-- platform.UserOnboarding — Onboarding state per user
--
-- OUTPUT COLUMNS
-- id TEXT Onboarding record UUID
-- createdAt TIMESTAMPTZ When onboarding started
-- updatedAt TIMESTAMPTZ Last update to onboarding state
-- usageReason TEXT Why user signed up (e.g. 'work', 'personal')
-- integrations TEXT[] Array of integration names the user selected
-- userId TEXT User UUID
-- completedSteps TEXT[] Array of onboarding step enums completed
-- selectedStoreListingVersionId TEXT First marketplace agent the user chose (if any)
--
-- EXAMPLE QUERIES
-- -- Usage reason breakdown
-- SELECT "usageReason", COUNT(*) FROM analytics.user_onboarding GROUP BY 1;
--
-- -- Completion rate per step
-- SELECT step, COUNT(*) AS users_completed
-- FROM analytics.user_onboarding
-- CROSS JOIN LATERAL UNNEST("completedSteps") AS step
-- GROUP BY 1 ORDER BY users_completed DESC;
-- =============================================================
SELECT
id,
"createdAt",
"updatedAt",
"usageReason",
integrations,
"userId",
"completedSteps",
"selectedStoreListingVersionId"
FROM platform."UserOnboarding"

View File

@@ -0,0 +1,100 @@
-- =============================================================
-- View: analytics.user_onboarding_funnel
-- Looker source alias: ds74 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Pre-aggregated onboarding funnel showing how many users
-- completed each step and the drop-off percentage from the
-- previous step. One row per onboarding step (all 22 steps
-- always present, even with 0 completions — prevents sparse
-- gaps from making LAG compare the wrong predecessors).
--
-- SOURCE TABLES
-- platform.UserOnboarding — Onboarding records with completedSteps array
--
-- OUTPUT COLUMNS
-- step TEXT Onboarding step enum name (e.g. 'WELCOME', 'CONGRATS')
-- step_order INT Numeric position in the funnel (1=first, 22=last)
-- users_completed BIGINT Distinct users who completed this step
-- pct_from_prev NUMERIC % of users from the previous step who reached this one
--
-- STEP ORDER
-- 1 WELCOME 9 MARKETPLACE_VISIT 17 SCHEDULE_AGENT
-- 2 USAGE_REASON 10 MARKETPLACE_ADD_AGENT 18 RUN_AGENTS
-- 3 INTEGRATIONS 11 MARKETPLACE_RUN_AGENT 19 RUN_3_DAYS
-- 4 AGENT_CHOICE 12 BUILDER_OPEN 20 TRIGGER_WEBHOOK
-- 5 AGENT_NEW_RUN 13 BUILDER_SAVE_AGENT 21 RUN_14_DAYS
-- 6 AGENT_INPUT 14 BUILDER_RUN_AGENT 22 RUN_AGENTS_100
-- 7 CONGRATS 15 VISIT_COPILOT
-- 8 GET_RESULTS 16 RE_RUN_AGENT
--
-- WINDOW
-- Users who started onboarding in the last 90 days
--
-- EXAMPLE QUERIES
-- -- Full funnel
-- SELECT * FROM analytics.user_onboarding_funnel ORDER BY step_order;
--
-- -- Biggest drop-off point
-- SELECT step, pct_from_prev FROM analytics.user_onboarding_funnel
-- ORDER BY pct_from_prev ASC LIMIT 3;
-- =============================================================
WITH all_steps AS (
-- Complete ordered grid of all 22 steps so zero-completion steps
-- are always present, keeping LAG comparisons correct.
SELECT step_name, step_order
FROM (VALUES
('WELCOME', 1),
('USAGE_REASON', 2),
('INTEGRATIONS', 3),
('AGENT_CHOICE', 4),
('AGENT_NEW_RUN', 5),
('AGENT_INPUT', 6),
('CONGRATS', 7),
('GET_RESULTS', 8),
('MARKETPLACE_VISIT', 9),
('MARKETPLACE_ADD_AGENT', 10),
('MARKETPLACE_RUN_AGENT', 11),
('BUILDER_OPEN', 12),
('BUILDER_SAVE_AGENT', 13),
('BUILDER_RUN_AGENT', 14),
('VISIT_COPILOT', 15),
('RE_RUN_AGENT', 16),
('SCHEDULE_AGENT', 17),
('RUN_AGENTS', 18),
('RUN_3_DAYS', 19),
('TRIGGER_WEBHOOK', 20),
('RUN_14_DAYS', 21),
('RUN_AGENTS_100', 22)
) AS t(step_name, step_order)
),
raw AS (
SELECT
u."userId",
step_txt::text AS step
FROM platform."UserOnboarding" u
CROSS JOIN LATERAL UNNEST(u."completedSteps") AS step_txt
WHERE u."createdAt" >= CURRENT_DATE - INTERVAL '90 days'
),
step_counts AS (
SELECT step, COUNT(DISTINCT "userId") AS users_completed
FROM raw GROUP BY step
),
funnel AS (
SELECT
a.step_name AS step,
a.step_order,
COALESCE(sc.users_completed, 0) AS users_completed,
ROUND(
100.0 * COALESCE(sc.users_completed, 0)
/ NULLIF(
LAG(COALESCE(sc.users_completed, 0)) OVER (ORDER BY a.step_order),
0
),
2
) AS pct_from_prev
FROM all_steps a
LEFT JOIN step_counts sc ON sc.step = a.step_name
)
SELECT * FROM funnel ORDER BY step_order

View File

@@ -0,0 +1,41 @@
-- =============================================================
-- View: analytics.user_onboarding_integration
-- Looker source alias: ds75 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Pre-aggregated count of users who selected each integration
-- during onboarding. One row per integration type, sorted
-- by popularity.
--
-- SOURCE TABLES
-- platform.UserOnboarding — integrations array column
--
-- OUTPUT COLUMNS
-- integration TEXT Integration name (e.g. 'github', 'slack', 'notion')
-- users_with_integration BIGINT Distinct users who selected this integration
--
-- WINDOW
-- Users who started onboarding in the last 90 days
--
-- EXAMPLE QUERIES
-- -- Full integration popularity ranking
-- SELECT * FROM analytics.user_onboarding_integration;
--
-- -- Top 5 integrations
-- SELECT * FROM analytics.user_onboarding_integration LIMIT 5;
-- =============================================================
WITH exploded AS (
SELECT
u."userId" AS user_id,
UNNEST(u."integrations") AS integration
FROM platform."UserOnboarding" u
WHERE u."createdAt" >= CURRENT_DATE - INTERVAL '90 days'
)
SELECT
integration,
COUNT(DISTINCT user_id) AS users_with_integration
FROM exploded
WHERE integration IS NOT NULL AND integration <> ''
GROUP BY integration
ORDER BY users_with_integration DESC

View File

@@ -0,0 +1,145 @@
-- =============================================================
-- View: analytics.users_activities
-- Looker source alias: ds56 | Charts: 5
-- =============================================================
-- DESCRIPTION
-- One row per user with lifetime activity summary.
-- Joins login sessions with agent graphs, executions and
-- node-level runs to give a full picture of how engaged
-- each user is. Includes a convenience flag for 7-day
-- activation (did the user return at least 7 days after
-- their first login?).
--
-- SOURCE TABLES
-- auth.sessions — Login/session records
-- platform.AgentGraph — Graphs (agents) built by the user
-- platform.AgentGraphExecution — Agent run history
-- platform.AgentNodeExecution — Individual block execution history
--
-- PERFORMANCE NOTE
-- Each CTE aggregates its own table independently by userId.
-- This avoids the fan-out that occurs when driving every join
-- from user_logins across the two largest tables
-- (AgentGraphExecution and AgentNodeExecution).
--
-- OUTPUT COLUMNS
-- user_id TEXT Supabase user UUID
-- first_login_time TIMESTAMPTZ First ever session created_at
-- last_login_time TIMESTAMPTZ Most recent session created_at
-- last_visit_time TIMESTAMPTZ Max of last refresh or login
-- last_agent_save_time TIMESTAMPTZ Last time user saved an agent graph
-- agent_count BIGINT Number of distinct active graphs built (0 if none)
-- first_agent_run_time TIMESTAMPTZ First ever graph execution
-- last_agent_run_time TIMESTAMPTZ Most recent graph execution
-- unique_agent_runs BIGINT Distinct agent graphs ever run (0 if none)
-- agent_runs BIGINT Total graph execution count (0 if none)
-- node_execution_count BIGINT Total node executions across all runs
-- node_execution_failed BIGINT Node executions with FAILED status
-- node_execution_completed BIGINT Node executions with COMPLETED status
-- node_execution_terminated BIGINT Node executions with TERMINATED status
-- node_execution_queued BIGINT Node executions with QUEUED status
-- node_execution_running BIGINT Node executions with RUNNING status
-- is_active_after_7d INT 1=returned after day 7, 0=did not, NULL=too early to tell
-- node_execution_incomplete BIGINT Node executions with INCOMPLETE status
-- node_execution_review BIGINT Node executions with REVIEW status
--
-- EXAMPLE QUERIES
-- -- Users who ran at least one agent and returned after 7 days
-- SELECT COUNT(*) FROM analytics.users_activities
-- WHERE agent_runs > 0 AND is_active_after_7d = 1;
--
-- -- Top 10 most active users by agent runs
-- SELECT user_id, agent_runs, node_execution_count
-- FROM analytics.users_activities
-- ORDER BY agent_runs DESC LIMIT 10;
--
-- -- 7-day activation rate
-- SELECT
-- SUM(CASE WHEN is_active_after_7d = 1 THEN 1 ELSE 0 END)::float
-- / NULLIF(COUNT(CASE WHEN is_active_after_7d IS NOT NULL THEN 1 END), 0)
-- AS activation_rate
-- FROM analytics.users_activities;
-- =============================================================
WITH user_logins AS (
SELECT
user_id::text AS user_id,
MIN(created_at) AS first_login_time,
MAX(created_at) AS last_login_time,
GREATEST(
MAX(refreshed_at)::timestamptz,
MAX(created_at)::timestamptz
) AS last_visit_time
FROM auth.sessions
GROUP BY user_id
),
user_agents AS (
-- Aggregate AgentGraph directly by userId (no fan-out from user_logins)
SELECT
"userId"::text AS user_id,
MAX("updatedAt") AS last_agent_save_time,
COUNT(DISTINCT "id") AS agent_count
FROM platform."AgentGraph"
WHERE "isActive"
GROUP BY "userId"
),
user_graph_runs AS (
-- Aggregate AgentGraphExecution directly by userId
SELECT
"userId"::text AS user_id,
MIN("createdAt") AS first_agent_run_time,
MAX("createdAt") AS last_agent_run_time,
COUNT(DISTINCT "agentGraphId") AS unique_agent_runs,
COUNT("id") AS agent_runs
FROM platform."AgentGraphExecution"
GROUP BY "userId"
),
user_node_runs AS (
-- Aggregate AgentNodeExecution directly; resolve userId via a
-- single join to AgentGraphExecution instead of fanning out from
-- user_logins through both large tables.
SELECT
g."userId"::text AS user_id,
COUNT(*) AS node_execution_count,
COUNT(*) FILTER (WHERE n."executionStatus" = 'FAILED') AS node_execution_failed,
COUNT(*) FILTER (WHERE n."executionStatus" = 'COMPLETED') AS node_execution_completed,
COUNT(*) FILTER (WHERE n."executionStatus" = 'TERMINATED') AS node_execution_terminated,
COUNT(*) FILTER (WHERE n."executionStatus" = 'QUEUED') AS node_execution_queued,
COUNT(*) FILTER (WHERE n."executionStatus" = 'RUNNING') AS node_execution_running,
COUNT(*) FILTER (WHERE n."executionStatus" = 'INCOMPLETE') AS node_execution_incomplete,
COUNT(*) FILTER (WHERE n."executionStatus" = 'REVIEW') AS node_execution_review
FROM platform."AgentNodeExecution" n
JOIN platform."AgentGraphExecution" g
ON g."id" = n."agentGraphExecutionId"
GROUP BY g."userId"
)
SELECT
ul.user_id,
ul.first_login_time,
ul.last_login_time,
ul.last_visit_time,
ua.last_agent_save_time,
COALESCE(ua.agent_count, 0) AS agent_count,
gr.first_agent_run_time,
gr.last_agent_run_time,
COALESCE(gr.unique_agent_runs, 0) AS unique_agent_runs,
COALESCE(gr.agent_runs, 0) AS agent_runs,
COALESCE(nr.node_execution_count, 0) AS node_execution_count,
COALESCE(nr.node_execution_failed, 0) AS node_execution_failed,
COALESCE(nr.node_execution_completed, 0) AS node_execution_completed,
COALESCE(nr.node_execution_terminated, 0) AS node_execution_terminated,
COALESCE(nr.node_execution_queued, 0) AS node_execution_queued,
COALESCE(nr.node_execution_running, 0) AS node_execution_running,
CASE
WHEN ul.first_login_time < NOW() - INTERVAL '7 days'
AND ul.last_visit_time >= ul.first_login_time + INTERVAL '7 days' THEN 1
WHEN ul.first_login_time < NOW() - INTERVAL '7 days'
AND ul.last_visit_time < ul.first_login_time + INTERVAL '7 days' THEN 0
ELSE NULL
END AS is_active_after_7d,
COALESCE(nr.node_execution_incomplete, 0) AS node_execution_incomplete,
COALESCE(nr.node_execution_review, 0) AS node_execution_review
FROM user_logins ul
LEFT JOIN user_agents ua ON ul.user_id = ua.user_id
LEFT JOIN user_graph_runs gr ON ul.user_id = gr.user_id
LEFT JOIN user_node_runs nr ON ul.user_id = nr.user_id

View File

@@ -638,7 +638,7 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
# Mock get_node_executions to return node_id mapping
mock_get_node_executions = mocker.patch(
"backend.data.execution.get_node_executions"
"backend.api.features.executions.review.routes.get_node_executions"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_exec_id = "test_node_123"
@@ -936,7 +936,7 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
# Mock get_node_executions to return node_id mapping
mock_get_node_executions = mocker.patch(
"backend.data.execution.get_node_executions"
"backend.api.features.executions.review.routes.get_node_executions"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_exec_id = "node_exec_approved"
@@ -1148,7 +1148,7 @@ async def test_process_review_action_per_review_auto_approve_granularity(
# Mock get_node_executions to return batch node data
mock_get_node_executions = mocker.patch(
"backend.data.execution.get_node_executions"
"backend.api.features.executions.review.routes.get_node_executions"
)
# Create mock node executions for each review
mock_node_execs = []

View File

@@ -6,10 +6,15 @@ import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, HTTPException, Query, Security, status
from prisma.enums import ReviewStatus
from backend.copilot.constants import (
is_copilot_synthetic_id,
parse_node_id_from_exec_id,
)
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
get_graph_execution_meta,
get_node_executions,
)
from backend.data.graph import get_graph_settings
from backend.data.human_review import (
@@ -36,6 +41,38 @@ router = APIRouter(
)
async def _resolve_node_ids(
node_exec_ids: list[str],
graph_exec_id: str,
is_copilot: bool,
) -> dict[str, str]:
"""Resolve node_exec_id -> node_id for auto-approval records.
CoPilot synthetic IDs encode node_id in the format "{node_id}:{random}".
Graph executions look up node_id from NodeExecution records.
"""
if not node_exec_ids:
return {}
if is_copilot:
return {neid: parse_node_id_from_exec_id(neid) for neid in node_exec_ids}
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {ne.node_exec_id: ne.node_id for ne in node_execs}
result = {}
for neid in node_exec_ids:
if neid in node_exec_map:
result[neid] = node_exec_map[neid]
else:
logger.error(
f"Failed to resolve node_id for {neid}: Node execution not found."
)
return result
@router.get(
"/pending",
summary="Get Pending Reviews",
@@ -110,14 +147,16 @@ async def list_pending_reviews_for_execution(
"""
# Verify user owns the graph execution before returning reviews
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
# (CoPilot synthetic IDs don't have graph execution records)
if not is_copilot_synthetic_id(graph_exec_id):
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
return await get_pending_reviews_for_execution(graph_exec_id, user_id)
@@ -160,30 +199,26 @@ async def process_review_action(
)
graph_exec_id = next(iter(graph_exec_ids))
is_copilot = is_copilot_synthetic_id(graph_exec_id)
# Validate execution status before processing reviews
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
# Only allow processing reviews if execution is paused for review
# or incomplete (partial execution with some reviews already processed)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
f"Reviews can only be processed when execution is paused (REVIEW status). "
f"Current status: {graph_exec_meta.status}",
# Validate execution status for graph executions (skip for CoPilot synthetic IDs)
if not is_copilot:
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}",
)
# Build review decisions map and track which reviews requested auto-approval
# Auto-approved reviews use original data (no modifications allowed)
@@ -236,7 +271,7 @@ async def process_review_action(
)
return (node_id, False)
# Collect node_exec_ids that need auto-approval
# Collect node_exec_ids that need auto-approval and resolve their node_ids
node_exec_ids_needing_auto_approval = [
node_exec_id
for node_exec_id, review_result in updated_reviews.items()
@@ -244,29 +279,16 @@ async def process_review_action(
and auto_approve_requests.get(node_exec_id, False)
]
# Batch-fetch node executions to get node_ids
node_id_map = await _resolve_node_ids(
node_exec_ids_needing_auto_approval, graph_exec_id, is_copilot
)
# Deduplicate by node_id — one auto-approval per node
nodes_needing_auto_approval: dict[str, Any] = {}
if node_exec_ids_needing_auto_approval:
from backend.data.execution import get_node_executions
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {node_exec.node_exec_id: node_exec for node_exec in node_execs}
for node_exec_id in node_exec_ids_needing_auto_approval:
node_exec = node_exec_map.get(node_exec_id)
if node_exec:
review_result = updated_reviews[node_exec_id]
# Use the first approved review for this node (deduplicate by node_id)
if node_exec.node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_exec.node_id] = review_result
else:
logger.error(
f"Failed to create auto-approval record for {node_exec_id}: "
f"Node execution not found. This may indicate a race condition "
f"or data inconsistency."
)
for node_exec_id in node_exec_ids_needing_auto_approval:
node_id = node_id_map.get(node_exec_id)
if node_id and node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_id] = updated_reviews[node_exec_id]
# Execute all auto-approval creations in parallel (deduplicated by node_id)
auto_approval_results = await asyncio.gather(
@@ -281,13 +303,11 @@ async def process_review_action(
auto_approval_failed_count = 0
for result in auto_approval_results:
if isinstance(result, Exception):
# Unexpected exception during auto-approval creation
auto_approval_failed_count += 1
logger.error(
f"Unexpected exception during auto-approval creation: {result}"
)
elif isinstance(result, tuple) and len(result) == 2 and not result[1]:
# Auto-approval creation failed (returned False)
auto_approval_failed_count += 1
# Count results
@@ -302,22 +322,20 @@ async def process_review_action(
if review.status == ReviewStatus.REJECTED
)
# Resume execution only if ALL pending reviews for this execution have been processed
if updated_reviews:
# Resume graph execution only for real graph executions (not CoPilot)
# CoPilot sessions are resumed by the LLM retrying run_block with review_id
if not is_copilot and updated_reviews:
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
if not still_has_pending:
# Get the graph_id from any processed review
first_review = next(iter(updated_reviews.values()))
try:
# Fetch user and settings to build complete execution context
user = await get_user_by_id(user_id)
settings = await get_graph_settings(
user_id=user_id, graph_id=first_review.graph_id
)
# Preserve user's timezone preference when resuming execution
user_timezone = (
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
)

View File

@@ -37,7 +37,6 @@ import backend.api.features.workspace.routes as workspace_routes
import backend.data.block
import backend.data.db
import backend.data.graph
import backend.data.llm_registry
import backend.data.user
import backend.integrations.webhooks.utils
import backend.util.service
@@ -118,30 +117,11 @@ async def lifespan_context(app: fastapi.FastAPI):
AutoRegistry.patch_integrations()
# Refresh LLM registry before initializing blocks so blocks can use registry data
# Note: Graceful fallback for now since no blocks consume registry yet (comes in PR #5)
# When block integration lands, this should fail hard or skip block initialization
try:
await backend.data.llm_registry.refresh_llm_registry()
logger.info("LLM registry refreshed successfully at startup")
except Exception as e:
logger.warning(
f"Failed to refresh LLM registry at startup: {e}. "
"Blocks will initialize with empty registry."
)
await backend.data.block.initialize_blocks()
await backend.data.user.migrate_and_encrypt_user_integrations()
await backend.data.graph.fix_llm_provider_credentials()
try:
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
except Exception as e:
logger.warning(
f"Failed to migrate LLM models at startup: {e}. "
"This is expected in test environments without AgentNode table."
)
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
await backend.integrations.webhooks.utils.migrate_legacy_triggered_graphs()
with launch_darkly_context():

View File

@@ -624,6 +624,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
graph_id: str,
graph_version: int,
execution_context: "ExecutionContext",
is_graph_execution: bool = True,
**kwargs,
) -> tuple[bool, BlockInput]:
"""
@@ -652,6 +653,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
graph_version=graph_version,
block_name=self.name,
editable=True,
is_graph_execution=is_graph_execution,
)
if decision is None:

View File

@@ -126,7 +126,7 @@ class PrintToConsoleBlock(Block):
output_schema=PrintToConsoleBlock.Output,
test_input={"text": "Hello, World!"},
is_sensitive_action=True,
disabled=True, # Disabled per Nick Tindle's request (OPEN-3000)
disabled=True,
test_output=[
("output", "Hello, World!"),
("status", "printed"),

View File

@@ -67,6 +67,7 @@ class HITLReviewHelper:
graph_version: int,
block_name: str = "Block",
editable: bool = False,
is_graph_execution: bool = True,
) -> Optional[ReviewResult]:
"""
Handle a review request for a block that requires human review.
@@ -143,10 +144,11 @@ class HITLReviewHelper:
logger.info(
f"Block {block_name} pausing execution for node {node_exec_id} - awaiting human review"
)
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
if is_graph_execution:
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
return None # Signal that execution should pause
# Mark review as processed if not already done
@@ -168,6 +170,7 @@ class HITLReviewHelper:
graph_version: int,
block_name: str = "Block",
editable: bool = False,
is_graph_execution: bool = True,
) -> Optional[ReviewDecision]:
"""
Handle a review request and return the decision in a single call.
@@ -197,6 +200,7 @@ class HITLReviewHelper:
graph_version=graph_version,
block_name=block_name,
editable=editable,
is_graph_execution=is_graph_execution,
)
if review_result is None:

View File

@@ -6,6 +6,32 @@
COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]" # Renders as ErrorCard
COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]" # Renders as system info message
# Prefix for all synthetic IDs generated by CoPilot block execution.
# Used to distinguish CoPilot-generated records from real graph execution records
# in PendingHumanReview and other tables.
COPILOT_SYNTHETIC_ID_PREFIX = "copilot-"
# Sub-prefixes for session-scoped and node-scoped synthetic IDs.
COPILOT_SESSION_PREFIX = f"{COPILOT_SYNTHETIC_ID_PREFIX}session-"
COPILOT_NODE_PREFIX = f"{COPILOT_SYNTHETIC_ID_PREFIX}node-"
# Separator used in synthetic node_exec_id to encode node_id.
# Format: "{node_id}:{random_hex}" — extract node_id via rsplit(":", 1)[0]
COPILOT_NODE_EXEC_ID_SEPARATOR = ":"
# Compaction notice messages shown to users.
COMPACTION_DONE_MSG = "Earlier messages were summarized to fit within context limits."
COMPACTION_TOOL_NAME = "context_compaction"
def is_copilot_synthetic_id(id_value: str) -> bool:
"""Check if an ID is a CoPilot synthetic ID (not from a real graph execution)."""
return id_value.startswith(COPILOT_SYNTHETIC_ID_PREFIX)
def parse_node_id_from_exec_id(node_exec_id: str) -> str:
"""Extract node_id from a synthetic node_exec_id.
Format: "{node_id}:{random_hex}" → returns "{node_id}".
"""
return node_exec_id.rsplit(COPILOT_NODE_EXEC_ID_SEPARATOR, 1)[0]

View File

@@ -12,6 +12,7 @@ from .agent_browser import BrowserActTool, BrowserNavigateTool, BrowserScreensho
from .agent_output import AgentOutputTool
from .base import BaseTool
from .bash_exec import BashExecTool
from .continue_run_block import ContinueRunBlockTool
from .create_agent import CreateAgentTool
from .customize_agent import CustomizeAgentTool
from .edit_agent import EditAgentTool
@@ -68,6 +69,7 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"move_agents_to_folder": MoveAgentsToFolderTool(),
"run_agent": RunAgentTool(),
"run_block": RunBlockTool(),
"continue_run_block": ContinueRunBlockTool(),
"run_mcp_tool": RunMCPToolTool(),
"get_mcp_guide": GetMCPGuideTool(),
"view_agent_output": AgentOutputTool(),

View File

@@ -0,0 +1,157 @@
"""Tool for continuing block execution after human review approval."""
import logging
from typing import Any
from prisma.enums import ReviewStatus
from backend.blocks import get_block
from backend.copilot.constants import (
COPILOT_NODE_PREFIX,
COPILOT_SESSION_PREFIX,
parse_node_id_from_exec_id,
)
from backend.copilot.model import ChatSession
from backend.data.db_accessors import review_db
from .base import BaseTool
from .helpers import execute_block, resolve_block_credentials
from .models import ErrorResponse, ToolResponseBase
logger = logging.getLogger(__name__)
class ContinueRunBlockTool(BaseTool):
"""Tool for continuing a block execution after human review approval."""
@property
def name(self) -> str:
return "continue_run_block"
@property
def description(self) -> str:
return (
"Continue executing a block after human review approval. "
"Use this after a run_block call returned review_required. "
"Pass the review_id from the review_required response. "
"The block will execute with the original pre-approved input data."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"review_id": {
"type": "string",
"description": (
"The review_id from a previous review_required response. "
"This resumes execution with the pre-approved input data."
),
},
},
"required": ["review_id"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
review_id = (
kwargs.get("review_id", "").strip() if kwargs.get("review_id") else ""
)
session_id = session.session_id
if not review_id:
return ErrorResponse(
message="Please provide a review_id", session_id=session_id
)
if not user_id:
return ErrorResponse(
message="Authentication required", session_id=session_id
)
# Look up and validate the review record via adapter
reviews = await review_db().get_reviews_by_node_exec_ids([review_id], user_id)
review = reviews.get(review_id)
if not review:
return ErrorResponse(
message=(
f"Review '{review_id}' not found or already executed. "
"It may have been consumed by a previous continue_run_block call."
),
session_id=session_id,
)
# Validate the review belongs to this session
expected_graph_exec_id = f"{COPILOT_SESSION_PREFIX}{session_id}"
if review.graph_exec_id != expected_graph_exec_id:
return ErrorResponse(
message="Review does not belong to this session.",
session_id=session_id,
)
if review.status == ReviewStatus.WAITING:
return ErrorResponse(
message="Review has not been approved yet. "
"Please wait for the user to approve the review first.",
session_id=session_id,
)
if review.status == ReviewStatus.REJECTED:
return ErrorResponse(
message="Review was rejected. The block will not execute.",
session_id=session_id,
)
# Extract block_id from review_id: copilot-node-{block_id}:{random_hex}
block_id = parse_node_id_from_exec_id(review_id).removeprefix(
COPILOT_NODE_PREFIX
)
block = get_block(block_id)
if not block:
return ErrorResponse(
message=f"Block '{block_id}' not found", session_id=session_id
)
input_data: dict[str, Any] = (
review.payload if isinstance(review.payload, dict) else {}
)
logger.info(
f"Continuing block {block.name} ({block_id}) for user {user_id} "
f"with review_id={review_id}"
)
matched_creds, missing_creds = await resolve_block_credentials(
user_id, block, input_data
)
if missing_creds:
return ErrorResponse(
message=f"Block '{block.name}' requires credentials that are not configured.",
session_id=session_id,
)
result = await execute_block(
block=block,
block_id=block_id,
input_data=input_data,
user_id=user_id,
session_id=session_id,
node_exec_id=review_id,
matched_credentials=matched_creds,
)
# Delete review record after successful execution (one-time use)
if result.type != "error":
await review_db().delete_review_by_node_exec_id(review_id, user_id)
return result

View File

@@ -0,0 +1,186 @@
"""Tests for ContinueRunBlockTool."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from prisma.enums import ReviewStatus
from ._test_data import make_session
from .continue_run_block import ContinueRunBlockTool
from .models import BlockOutputResponse, ErrorResponse
_TEST_USER_ID = "test-user-continue"
def _make_review_model(
node_exec_id: str,
status: ReviewStatus = ReviewStatus.APPROVED,
payload: dict | None = None,
graph_exec_id: str = "",
):
"""Create a mock PendingHumanReviewModel."""
mock = MagicMock()
mock.node_exec_id = node_exec_id
mock.status = status
mock.payload = payload or {"text": "hello"}
mock.graph_exec_id = graph_exec_id
return mock
class TestContinueRunBlock:
@pytest.mark.asyncio(loop_scope="session")
async def test_missing_review_id_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id="",
)
assert isinstance(response, ErrorResponse)
assert "review_id" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_review_not_found_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(return_value={})
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id="copilot-node-some-block:abc12345",
)
assert isinstance(response, ErrorResponse)
assert "not found" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_waiting_review_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-some-block:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
review = _make_review_model(
review_id, status=ReviewStatus.WAITING, graph_exec_id=graph_exec_id
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, ErrorResponse)
assert "not been approved" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_rejected_review_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-some-block:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
review = _make_review_model(
review_id, status=ReviewStatus.REJECTED, graph_exec_id=graph_exec_id
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, ErrorResponse)
assert "rejected" in response.message.lower()
@pytest.mark.asyncio(loop_scope="session")
async def test_approved_review_executes_block(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-delete-branch-id:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
input_data = {"repo_url": "https://github.com/test/repo", "branch": "main"}
review = _make_review_model(
review_id,
status=ReviewStatus.APPROVED,
payload=input_data,
graph_exec_id=graph_exec_id,
)
mock_block = MagicMock()
mock_block.name = "Delete Branch"
async def mock_execute(data, **kwargs):
yield "result", "Branch deleted"
mock_block.execute = mock_execute
mock_block.input_schema.get_credentials_fields_info.return_value = []
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
mock_db.delete_review_by_node_exec_id = AsyncMock(return_value=1)
with (
patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
),
patch(
"backend.copilot.tools.continue_run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
assert response.block_name == "Delete Branch"
# Verify review was deleted (one-time use)
mock_db.delete_review_by_node_exec_id.assert_called_once_with(
review_id, _TEST_USER_ID
)

View File

@@ -1,7 +1,24 @@
"""Shared helpers for chat tools."""
import logging
from collections import defaultdict
from typing import Any
from pydantic_core import PydanticUndefined
from backend.blocks._base import AnyBlockSchema
from backend.copilot.constants import COPILOT_NODE_PREFIX, COPILOT_SESSION_PREFIX
from backend.data.db_accessors import workspace_db
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .models import BlockOutputResponse, ErrorResponse, ToolResponseBase
from .utils import match_credentials_to_requirements
logger = logging.getLogger(__name__)
def get_inputs_from_schema(
input_schema: dict[str, Any],
@@ -27,3 +44,159 @@ def get_inputs_from_schema(
for name, schema in properties.items()
if name not in exclude
]
async def execute_block(
*,
block: AnyBlockSchema,
block_id: str,
input_data: dict[str, Any],
user_id: str,
session_id: str,
node_exec_id: str,
matched_credentials: dict[str, CredentialsMetaInput],
sensitive_action_safe_mode: bool = False,
) -> ToolResponseBase:
"""Execute a block with full context setup, credential injection, and error handling.
This is the shared execution path used by both ``run_block`` (after review
check) and ``continue_run_block`` (after approval).
Returns:
BlockOutputResponse on success, ErrorResponse on failure.
"""
try:
workspace = await workspace_db().get_or_create_workspace(user_id)
synthetic_graph_id = f"{COPILOT_SESSION_PREFIX}{session_id}"
synthetic_node_id = f"{COPILOT_NODE_PREFIX}{block_id}"
execution_context = ExecutionContext(
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_id,
graph_version=1,
node_id=synthetic_node_id,
node_exec_id=node_exec_id,
workspace_id=workspace.id,
session_id=session_id,
sensitive_action_safe_mode=sensitive_action_safe_mode,
)
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": execution_context,
"workspace_id": workspace.id,
"graph_exec_id": synthetic_graph_id,
"node_exec_id": node_exec_id,
"node_id": synthetic_node_id,
"graph_version": 1,
"graph_id": synthetic_graph_id,
}
# Inject credentials
creds_manager = IntegrationCredentialsManager()
for field_name, cred_meta in matched_credentials.items():
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
session_id=session_id,
)
async def resolve_block_credentials(
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""Resolve credentials for a block by matching user's available credentials.
Handles discriminated credentials (e.g. provider selection based on model).
Returns:
(matched_credentials, missing_credentials)
"""
input_data = input_data or {}
requirements = _resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
def _resolve_discriminated_credentials(
block: AnyBlockSchema,
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(field_info.discriminator)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
resolved[field_name] = effective_field_info
return resolved

View File

@@ -39,6 +39,7 @@ class ResponseType(str, Enum):
BLOCK_LIST = "block_list"
BLOCK_DETAILS = "block_details"
BLOCK_OUTPUT = "block_output"
REVIEW_REQUIRED = "review_required"
# MCP
MCP_GUIDE = "mcp_guide"
@@ -458,6 +459,21 @@ class BlockOutputResponse(ToolResponseBase):
success: bool = True
class ReviewRequiredResponse(ToolResponseBase):
"""Response when a block requires human review before execution."""
type: ResponseType = ResponseType.REVIEW_REQUIRED
block_id: str
block_name: str
review_id: str = Field(description="The review ID for tracking approval status")
graph_exec_id: str = Field(
description="The graph execution ID for fetching review status"
)
input_data: dict[str, Any] = Field(
description="The input data that requires review"
)
class WebFetchResponse(ToolResponseBase):
"""Response for web_fetch tool."""

View File

@@ -534,7 +534,9 @@ class RunAgentTool(BaseTool):
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' is awaiting human review. "
f"Check at {library_agent_link}."
f"The user can approve or reject inline. After approval, "
f"the execution resumes automatically. Use view_agent_output "
f"with execution_id='{execution.id}' to check the result."
),
session_id=session_id,
execution_id=execution.id,

View File

@@ -2,38 +2,34 @@
import logging
import uuid
from collections import defaultdict
from typing import Any
from pydantic_core import PydanticUndefined
from backend.blocks import BlockType, get_block
from backend.blocks._base import AnyBlockSchema
from backend.copilot.constants import (
COPILOT_NODE_EXEC_ID_SEPARATOR,
COPILOT_NODE_PREFIX,
COPILOT_SESSION_PREFIX,
)
from backend.copilot.model import ChatSession
from backend.data.db_accessors import workspace_db
from backend.data.db_accessors import review_db
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .base import BaseTool
from .find_block import COPILOT_EXCLUDED_BLOCK_IDS, COPILOT_EXCLUDED_BLOCK_TYPES
from .helpers import get_inputs_from_schema
from .helpers import execute_block, get_inputs_from_schema, resolve_block_credentials
from .models import (
BlockDetails,
BlockDetailsResponse,
BlockOutputResponse,
ErrorResponse,
InputValidationErrorResponse,
ReviewRequiredResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
from .utils import (
build_missing_credentials_from_field_info,
match_credentials_to_requirements,
)
from .utils import build_missing_credentials_from_field_info
logger = logging.getLogger(__name__)
@@ -52,7 +48,9 @@ class RunBlockTool(BaseTool):
"IMPORTANT: You MUST call find_block first to get the block's 'id' - "
"do NOT guess or make up block IDs. "
"On first attempt (without input_data), returns detailed schema showing "
"required inputs and outputs. Then call again with proper input_data to execute."
"required inputs and outputs. Then call again with proper input_data to execute. "
"If a block requires human review, use continue_run_block with the "
"review_id after the user approves."
)
@property
@@ -166,11 +164,10 @@ class RunBlockTool(BaseTool):
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
creds_manager = IntegrationCredentialsManager()
(
matched_credentials,
missing_credentials,
) = await self._resolve_block_credentials(user_id, block, input_data)
) = await resolve_block_credentials(user_id, block, input_data)
# Get block schemas for details/validation
try:
@@ -279,169 +276,97 @@ class RunBlockTool(BaseTool):
user_authenticated=True,
)
try:
# Get or create user's workspace for CoPilot file operations
workspace = await workspace_db().get_or_create_workspace(user_id)
# Generate synthetic IDs for CoPilot context.
# Encode node_id in node_exec_id so it can be extracted later
# (e.g. for auto-approve, where we need node_id but have no NodeExecution row).
synthetic_graph_id = f"{COPILOT_SESSION_PREFIX}{session.session_id}"
synthetic_node_id = f"{COPILOT_NODE_PREFIX}{block_id}"
# Generate synthetic IDs for CoPilot context
# Each chat session is treated as its own agent with one continuous run
# This means:
# - graph_id (agent) = session (memories scoped to session when limit_to_agent=True)
# - graph_exec_id (run) = session (memories scoped to session when limit_to_run=True)
# - node_exec_id = unique per block execution
synthetic_graph_id = f"copilot-session-{session.session_id}"
synthetic_graph_exec_id = f"copilot-session-{session.session_id}"
synthetic_node_id = f"copilot-node-{block_id}"
synthetic_node_exec_id = (
f"copilot-{session.session_id}-{uuid.uuid4().hex[:8]}"
)
# Create unified execution context with all required fields
execution_context = ExecutionContext(
# Execution identity
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_exec_id,
graph_version=1, # Versions are 1-indexed
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
# Workspace with session scoping
workspace_id=workspace.id,
session_id=session.session_id,
)
# Prepare kwargs for block execution
# Keep individual kwargs for backwards compatibility with existing blocks
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": execution_context,
# Legacy: individual kwargs for blocks not yet using execution_context
"workspace_id": workspace.id,
"graph_exec_id": synthetic_graph_exec_id,
"node_exec_id": synthetic_node_exec_id,
"node_id": synthetic_node_id,
"graph_version": 1, # Versions are 1-indexed
"graph_id": synthetic_graph_id,
}
for field_name, cred_meta in matched_credentials.items():
# Inject metadata into input_data (for validation)
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
# Fetch actual credentials and pass as kwargs (for execution)
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
# Check for an existing WAITING review for this block with the same input.
# If the LLM retries run_block with identical input, we reuse the existing
# review instead of creating duplicates. Different inputs = new execution.
existing_reviews = await review_db().get_pending_reviews_for_execution(
synthetic_graph_id, user_id
)
existing_review = next(
(
r
for r in existing_reviews
if r.node_id == synthetic_node_id
and r.status.value == "WAITING"
and r.payload == input_data
),
None,
)
if existing_review:
return ReviewRequiredResponse(
message=(
f"Block '{block.name}' requires human review. "
f"After the user approves, call continue_run_block with "
f"review_id='{existing_review.node_exec_id}' to execute."
),
session_id=session_id,
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
review_id=existing_review.node_exec_id,
graph_exec_id=synthetic_graph_id,
input_data=input_data,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
synthetic_node_exec_id = (
f"{synthetic_node_id}{COPILOT_NODE_EXEC_ID_SEPARATOR}"
f"{uuid.uuid4().hex[:8]}"
)
# Check for HITL review before execution.
# This creates the review record in the DB for CoPilot flows.
review_context = ExecutionContext(
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_id,
graph_version=1,
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
sensitive_action_safe_mode=True,
)
should_pause, input_data = await block.is_block_exec_need_review(
input_data,
user_id=user_id,
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
graph_exec_id=synthetic_graph_id,
graph_id=synthetic_graph_id,
graph_version=1,
execution_context=review_context,
is_graph_execution=False,
)
if should_pause:
return ReviewRequiredResponse(
message=(
f"Block '{block.name}' requires human review. "
f"After the user approves, call continue_run_block with "
f"review_id='{synthetic_node_exec_id}' to execute."
),
session_id=session_id,
block_id=block_id,
block_name=block.name,
review_id=synthetic_node_exec_id,
graph_exec_id=synthetic_graph_id,
input_data=input_data,
)
async def _resolve_block_credentials(
self,
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Resolve credentials for a block by matching user's available credentials.
Args:
user_id: User ID
block: Block to resolve credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple of (matched_credentials, missing_credentials) - matched credentials
are used for block execution, missing ones indicate setup requirements.
"""
input_data = input_data or {}
requirements = self._resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
return await execute_block(
block=block,
block_id=block_id,
input_data=input_data,
user_id=user_id,
session_id=session_id,
node_exec_id=synthetic_node_exec_id,
matched_credentials=matched_credentials,
)
def _get_inputs_list(self, block: AnyBlockSchema) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema."""
schema = block.input_schema.jsonschema()
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)
def _resolve_discriminated_credentials(
self,
block: AnyBlockSchema,
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(
field_info.discriminator
)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
# For host-scoped credentials, add the discriminator value
# (e.g., URL) so _credential_is_for_host can match it
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
resolved[field_name] = effective_field_info
return resolved

View File

@@ -12,6 +12,7 @@ from .models import (
BlockOutputResponse,
ErrorResponse,
InputValidationErrorResponse,
ReviewRequiredResponse,
)
from .run_block import RunBlockTool
@@ -27,9 +28,16 @@ def make_mock_block(
mock.name = name
mock.block_type = block_type
mock.disabled = disabled
mock.is_sensitive_action = False
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields_info.return_value = []
mock.input_schema.get_credentials_fields_info.return_value = {}
mock.input_schema.get_credentials_fields.return_value = {}
async def _no_review(input_data, **kwargs):
return False, input_data
mock.is_block_exec_need_review = _no_review
return mock
@@ -46,6 +54,7 @@ def make_mock_block_with_schema(
mock.name = name
mock.block_type = BlockType.STANDARD
mock.disabled = False
mock.is_sensitive_action = False
mock.description = f"Test block: {name}"
input_schema = {
@@ -63,6 +72,12 @@ def make_mock_block_with_schema(
mock.output_schema = MagicMock()
mock.output_schema.jsonschema.return_value = output_schema
# Default: no review needed, pass through input_data unchanged
async def _no_review(input_data, **kwargs):
return False, input_data
mock.is_block_exec_need_review = _no_review
return mock
@@ -126,9 +141,15 @@ class TestRunBlockFiltering:
"standard-id", "HTTP Request", BlockType.STANDARD
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=standard_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=standard_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
@@ -154,12 +175,7 @@ class TestRunBlockInputValidation:
@pytest.mark.asyncio(loop_scope="session")
async def test_unknown_input_fields_are_rejected(self):
"""run_block rejects unknown input fields instead of silently ignoring them.
Scenario: The AI Text Generator block has a field called 'model' (for LLM model
selection), but the LLM calling the tool guesses wrong and sends 'LLM_Model'
instead. The block should reject the request and return the valid schema.
"""
"""run_block rejects unknown input fields instead of silently ignoring them."""
session = make_session(user_id=_TEST_USER_ID)
mock_block = make_mock_block_with_schema(
@@ -182,27 +198,31 @@ class TestRunBlockInputValidation:
output_properties={"response": {"type": "string"}},
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
# Provide 'prompt' (correct) but 'LLM_Model' instead of 'model' (wrong key)
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"prompt": "Write a haiku about coding",
"LLM_Model": "claude-opus-4-6", # WRONG KEY - should be 'model'
"LLM_Model": "claude-opus-4-6",
},
)
assert isinstance(response, InputValidationErrorResponse)
assert "LLM_Model" in response.unrecognized_fields
assert "Block was not executed" in response.message
assert "inputs" in response.model_dump() # valid schema included
assert "inputs" in response.model_dump()
@pytest.mark.asyncio(loop_scope="session")
async def test_multiple_wrong_keys_are_all_reported(self):
@@ -221,21 +241,26 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"prompt": "Hello", # correct
"llm_model": "claude-opus-4-6", # WRONG - should be 'model'
"system_prompt": "Be helpful", # WRONG - should be 'sys_prompt'
"retries": 5, # WRONG - should be 'retry'
"prompt": "Hello",
"llm_model": "claude-opus-4-6",
"system_prompt": "Be helpful",
"retries": 5,
},
)
@@ -262,23 +287,26 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
# 'prompt' is missing AND 'LLM_Model' is an unknown field
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"LLM_Model": "claude-opus-4-6", # wrong key, and 'prompt' is missing
"LLM_Model": "claude-opus-4-6",
},
)
# Unknown fields are caught first
assert isinstance(response, InputValidationErrorResponse)
assert "LLM_Model" in response.unrecognized_fields
@@ -313,7 +341,11 @@ class TestRunBlockInputValidation:
return_value=mock_block,
),
patch(
"backend.copilot.tools.run_block.workspace_db",
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
@@ -325,7 +357,7 @@ class TestRunBlockInputValidation:
block_id="ai-text-gen-id",
input_data={
"prompt": "Write a haiku",
"model": "gpt-4o-mini", # correct field name
"model": "gpt-4o-mini",
},
)
@@ -347,20 +379,191 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
# Only provide valid optional field, missing required 'prompt'
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"model": "gpt-4o-mini", # valid but optional
"model": "gpt-4o-mini",
},
)
assert isinstance(response, BlockDetailsResponse)
class TestRunBlockSensitiveAction:
"""Tests for sensitive action HITL review in RunBlockTool.
run_block calls is_block_exec_need_review() explicitly before execution.
When review is needed (should_pause=True), ReviewRequiredResponse is returned.
"""
@pytest.mark.asyncio(loop_scope="session")
async def test_sensitive_block_paused_returns_review_required(self):
"""When is_block_exec_need_review returns should_pause=True, ReviewRequiredResponse is returned."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {
"repo_url": "https://github.com/test/repo",
"branch": "feature-branch",
}
mock_block = make_mock_block_with_schema(
block_id="delete-branch-id",
name="Delete Branch",
input_properties={
"repo_url": {"type": "string"},
"branch": {"type": "string"},
},
required_fields=["repo_url", "branch"],
)
mock_block.is_sensitive_action = True
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(True, input_data)
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="delete-branch-id",
input_data=input_data,
)
assert isinstance(response, ReviewRequiredResponse)
assert "requires human review" in response.message
assert "continue_run_block" in response.message
assert response.block_name == "Delete Branch"
@pytest.mark.asyncio(loop_scope="session")
async def test_sensitive_block_executes_after_approval(self):
"""After approval (should_pause=False), sensitive blocks execute and return outputs."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {
"repo_url": "https://github.com/test/repo",
"branch": "feature-branch",
}
mock_block = make_mock_block_with_schema(
block_id="delete-branch-id",
name="Delete Branch",
input_properties={
"repo_url": {"type": "string"},
"branch": {"type": "string"},
},
required_fields=["repo_url", "branch"],
)
mock_block.is_sensitive_action = True
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(False, input_data)
)
async def mock_execute(input_data, **kwargs):
yield "result", "Branch deleted successfully"
mock_block.execute = mock_execute
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="delete-branch-id",
input_data=input_data,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
@pytest.mark.asyncio(loop_scope="session")
async def test_non_sensitive_block_executes_normally(self):
"""Non-sensitive blocks skip review and execute directly."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {"url": "https://example.com"}
mock_block = make_mock_block_with_schema(
block_id="http-request-id",
name="HTTP Request",
input_properties={
"url": {"type": "string"},
},
required_fields=["url"],
)
mock_block.is_sensitive_action = False
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(False, input_data)
)
async def mock_execute(input_data, **kwargs):
yield "response", {"status": 200}
mock_block.execute = mock_execute
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="http-request-id",
input_data=input_data,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True

View File

@@ -65,9 +65,8 @@ async def test_run_block_returns_details_when_no_input_provided():
return_value=http_block,
):
# Mock credentials check to return no missing credentials
with patch.object(
RunBlockTool,
"_resolve_block_credentials",
with patch(
"backend.copilot.tools.run_block.resolve_block_credentials",
new_callable=AsyncMock,
return_value=({}, []), # (matched_credentials, missing_credentials)
):
@@ -123,9 +122,8 @@ async def test_run_block_returns_details_when_only_credentials_provided():
"backend.copilot.tools.run_block.get_block",
return_value=mock,
):
with patch.object(
RunBlockTool,
"_resolve_block_credentials",
with patch(
"backend.copilot.tools.run_block.resolve_block_credentials",
new_callable=AsyncMock,
return_value=(
{

View File

@@ -116,3 +116,16 @@ def workspace_db():
workspace_db = get_database_manager_async_client()
return workspace_db
def review_db():
if db.is_connected():
from backend.data import human_review as _review_db
review_db = _review_db
else:
from backend.util.clients import get_database_manager_async_client
review_db = get_database_manager_async_client()
return review_db

View File

@@ -79,7 +79,10 @@ from backend.data.graph import (
from backend.data.human_review import (
cancel_pending_reviews_for_execution,
check_approval,
delete_review_by_node_exec_id,
get_or_create_human_review,
get_pending_reviews_for_execution,
get_reviews_by_node_exec_ids,
has_pending_reviews_for_graph_exec,
update_review_processed_status,
)
@@ -246,7 +249,10 @@ class DatabaseManager(AppService):
# ============ Human In The Loop ============ #
cancel_pending_reviews_for_execution = _(cancel_pending_reviews_for_execution)
check_approval = _(check_approval)
delete_review_by_node_exec_id = _(delete_review_by_node_exec_id)
get_or_create_human_review = _(get_or_create_human_review)
get_pending_reviews_for_execution = _(get_pending_reviews_for_execution)
get_reviews_by_node_exec_ids = _(get_reviews_by_node_exec_ids)
has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
update_review_processed_status = _(update_review_processed_status)
@@ -433,7 +439,10 @@ class DatabaseManagerAsyncClient(AppServiceClient):
# ============ Human In The Loop ============ #
cancel_pending_reviews_for_execution = d.cancel_pending_reviews_for_execution
check_approval = d.check_approval
delete_review_by_node_exec_id = d.delete_review_by_node_exec_id
get_or_create_human_review = d.get_or_create_human_review
get_pending_reviews_for_execution = d.get_pending_reviews_for_execution
get_reviews_by_node_exec_ids = d.get_reviews_by_node_exec_ids
update_review_processed_status = d.update_review_processed_status
# ============ User Comms ============ #

View File

@@ -17,6 +17,10 @@ from backend.api.features.executions.review.model import (
PendingHumanReviewModel,
SafeJsonData,
)
from backend.copilot.constants import (
is_copilot_synthetic_id,
parse_node_id_from_exec_id,
)
from backend.data.execution import get_graph_execution_meta
from backend.util.json import SafeJson
@@ -123,11 +127,13 @@ async def create_auto_approval_record(
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate that the graph execution belongs to this user (defense in depth)
graph_exec = await get_graph_execution_meta(
# Validate ownership: if a graph execution record exists, it must belong
# to this user. Non-graph executions (e.g. CoPilot) won't have a record.
if not is_copilot_synthetic_id(
graph_exec_id
) and not await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
):
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
@@ -265,7 +271,7 @@ async def get_pending_review_by_node_exec_id(
async def get_reviews_by_node_exec_ids(
node_exec_ids: list[str], user_id: str
) -> dict[str, "PendingHumanReviewModel"]:
) -> dict[str, PendingHumanReviewModel]:
"""
Get multiple reviews by their node execution IDs regardless of status.
@@ -292,21 +298,26 @@ async def get_reviews_by_node_exec_ids(
if not reviews:
return {}
# Batch fetch all node executions to avoid N+1 queries
node_exec_ids_to_fetch = [review.nodeExecId for review in reviews]
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": node_exec_ids_to_fetch}},
include={"Node": True},
)
# Create mapping from node_exec_id to node_id
node_exec_id_to_node_id = {
node_exec.id: node_exec.agentNodeId for node_exec in node_execs
# Split into synthetic (CoPilot) and real IDs for different resolution paths
synthetic_ids = {
r.nodeExecId for r in reviews if is_copilot_synthetic_id(r.nodeExecId)
}
real_ids = [r.nodeExecId for r in reviews if r.nodeExecId not in synthetic_ids]
# Batch fetch real node executions to avoid N+1 queries
node_exec_id_to_node_id: dict[str, str] = {}
if real_ids:
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": real_ids}},
)
node_exec_id_to_node_id = {ne.id: ne.agentNodeId for ne in node_execs}
result = {}
for review in reviews:
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
if review.nodeExecId in synthetic_ids:
node_id = parse_node_id_from_exec_id(review.nodeExecId)
else:
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
@@ -331,6 +342,19 @@ async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
return count > 0
async def _resolve_node_id(node_exec_id: str, get_node_execution) -> str:
"""Resolve node_id from a node_exec_id.
For CoPilot synthetic IDs (e.g. copilot-node-block-id:abc12345),
extract the node_id portion (copilot-node-block-id).
For real graph executions, look up the NodeExecution record.
"""
if is_copilot_synthetic_id(node_exec_id):
return parse_node_id_from_exec_id(node_exec_id)
node_exec = await get_node_execution(node_exec_id)
return node_exec.node_id if node_exec else node_exec_id
async def get_pending_reviews_for_user(
user_id: str, page: int = 1, page_size: int = 25
) -> list["PendingHumanReviewModel"]:
@@ -361,8 +385,7 @@ async def get_pending_reviews_for_user(
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
@@ -370,7 +393,7 @@ async def get_pending_reviews_for_user(
async def get_pending_reviews_for_execution(
graph_exec_id: str, user_id: str
) -> list["PendingHumanReviewModel"]:
) -> list[PendingHumanReviewModel]:
"""
Get all pending reviews for a specific graph execution.
@@ -396,8 +419,7 @@ async def get_pending_reviews_for_execution(
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
@@ -509,8 +531,12 @@ async def process_all_reviews_for_execution(
result = {}
for review in all_result_reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
if is_copilot_synthetic_id(review.nodeExecId):
# CoPilot synthetic node_exec_ids encode node_id as "{node_id}:{random}"
node_id = parse_node_id_from_exec_id(review.nodeExecId)
else:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
@@ -564,3 +590,21 @@ async def cancel_pending_reviews_for_execution(graph_exec_id: str, user_id: str)
},
)
return result
async def delete_review_by_node_exec_id(node_exec_id: str, user_id: str) -> int:
"""Delete a review record by node execution ID after it has been consumed.
Used by CoPilot's continue_run_block to clean up one-time-use review records
after successful execution.
Args:
node_exec_id: The node execution ID of the review to delete
user_id: User ID for authorization
Returns:
Number of records deleted
"""
return await PendingHumanReview.prisma().delete_many(
where={"nodeExecId": node_exec_id, "userId": user_id}
)

View File

@@ -1,31 +0,0 @@
"""LLM Registry - Dynamic model management system."""
from .model import ModelMetadata
from .registry import (
RegistryModel,
RegistryModelCost,
RegistryModelCreator,
get_all_model_slugs_for_validation,
get_all_models,
get_default_model_slug,
get_enabled_models,
get_model,
get_schema_options,
refresh_llm_registry,
)
__all__ = [
# Models
"ModelMetadata",
"RegistryModel",
"RegistryModelCost",
"RegistryModelCreator",
# Functions
"refresh_llm_registry",
"get_model",
"get_all_models",
"get_enabled_models",
"get_schema_options",
"get_default_model_slug",
"get_all_model_slugs_for_validation",
]

View File

@@ -1,9 +0,0 @@
"""Type definitions for LLM model metadata.
Re-exports ModelMetadata from blocks.llm to avoid type collision.
In PR #5 (block integration), this will become the canonical location.
"""
from backend.blocks.llm import ModelMetadata
__all__ = ["ModelMetadata"]

View File

@@ -1,240 +0,0 @@
"""Core LLM registry implementation for managing models dynamically."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any
import prisma.models
from backend.data.llm_registry.model import ModelMetadata
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class RegistryModelCost:
"""Cost configuration for an LLM model."""
unit: str # "RUN" or "TOKENS"
credit_cost: int
credential_provider: str
credential_id: str | None
credential_type: str | None
currency: str | None
metadata: dict[str, Any]
@dataclass(frozen=True)
class RegistryModelCreator:
"""Creator information for an LLM model."""
id: str
name: str
display_name: str
description: str | None
website_url: str | None
logo_url: str | None
@dataclass(frozen=True)
class RegistryModel:
"""Represents a model in the LLM registry."""
slug: str
display_name: str
description: str | None
metadata: ModelMetadata
capabilities: dict[str, Any]
extra_metadata: dict[str, Any]
provider_display_name: str
is_enabled: bool
is_recommended: bool = False
costs: tuple[RegistryModelCost, ...] = field(default_factory=tuple)
creator: RegistryModelCreator | None = None
# In-memory cache (will be replaced with Redis in PR #6)
_dynamic_models: dict[str, RegistryModel] = {}
_schema_options: list[dict[str, str]] = []
_lock = asyncio.Lock()
async def refresh_llm_registry() -> None:
"""
Refresh the LLM registry from the database.
Fetches all models with their costs, providers, and creators,
then updates the in-memory cache.
"""
async with _lock:
try:
records = await prisma.models.LlmModel.prisma().find_many(
include={
"Provider": True,
"Costs": True,
"Creator": True,
}
)
logger.info(f"Fetched {len(records)} LLM models from database")
# Build model instances
new_models: dict[str, RegistryModel] = {}
for record in records:
# Parse costs
costs = tuple(
RegistryModelCost(
unit=str(cost.unit), # Convert enum to string
credit_cost=cost.creditCost,
credential_provider=cost.credentialProvider,
credential_id=cost.credentialId,
credential_type=cost.credentialType,
currency=cost.currency,
metadata=dict(cost.metadata or {}),
)
for cost in (record.Costs or [])
)
# Parse creator
creator = None
if record.Creator:
creator = RegistryModelCreator(
id=record.Creator.id,
name=record.Creator.name,
display_name=record.Creator.displayName,
description=record.Creator.description,
website_url=record.Creator.websiteUrl,
logo_url=record.Creator.logoUrl,
)
# Parse capabilities
capabilities = dict(record.capabilities or {})
# Build metadata from record
# Warn if Provider relation is missing (indicates data corruption)
if not record.Provider:
logger.warning(
f"LlmModel {record.slug} has no Provider despite NOT NULL FK - "
f"falling back to providerId {record.providerId}"
)
provider_name = (
record.Provider.name if record.Provider else record.providerId
)
provider_display = (
record.Provider.displayName
if record.Provider
else record.providerId
)
# Extract creator name (fallback to "Unknown" if no creator)
creator_name = (
record.Creator.displayName if record.Creator else "Unknown"
)
# Price tier defaults to 1 if not set
price_tier = record.priceTier if record.priceTier in (1, 2, 3) else 1
metadata = ModelMetadata(
provider=provider_name,
context_window=record.contextWindow,
max_output_tokens=(
record.maxOutputTokens
if record.maxOutputTokens is not None
else record.contextWindow
),
display_name=record.displayName,
provider_name=provider_display,
creator_name=creator_name,
price_tier=price_tier,
)
# Create model instance
model = RegistryModel(
slug=record.slug,
display_name=record.displayName,
description=record.description,
metadata=metadata,
capabilities=capabilities,
extra_metadata=dict(record.metadata or {}),
provider_display_name=provider_display,
is_enabled=record.isEnabled,
is_recommended=record.isRecommended,
costs=costs,
creator=creator,
)
new_models[record.slug] = model
# Atomic swap
global _dynamic_models, _schema_options
_dynamic_models = new_models
_schema_options = _build_schema_options()
logger.info(
f"LLM registry refreshed: {len(_dynamic_models)} models, "
f"{len(_schema_options)} schema options"
)
except Exception as e:
logger.error(f"Failed to refresh LLM registry: {e}", exc_info=True)
raise
def _build_schema_options() -> list[dict[str, str]]:
"""Build schema options for model selection dropdown. Only includes enabled models."""
return [
{
"label": model.display_name,
"value": model.slug,
"group": model.metadata.provider,
"description": model.description or "",
}
for model in sorted(
_dynamic_models.values(), key=lambda m: m.display_name.lower()
)
if model.is_enabled
]
def get_model(slug: str) -> RegistryModel | None:
"""Get a model by slug from the registry."""
return _dynamic_models.get(slug)
def get_all_models() -> list[RegistryModel]:
"""Get all models from the registry (including disabled)."""
return list(_dynamic_models.values())
def get_enabled_models() -> list[RegistryModel]:
"""Get only enabled models from the registry."""
return [model for model in _dynamic_models.values() if model.is_enabled]
def get_schema_options() -> list[dict[str, str]]:
"""Get schema options for model selection dropdown (enabled models only)."""
return _schema_options
def get_default_model_slug() -> str | None:
"""Get the default model slug (first recommended, or first enabled)."""
# Sort once and use next() to short-circuit on first match
models = sorted(_dynamic_models.values(), key=lambda m: m.display_name)
# Prefer recommended models
recommended = next(
(m.slug for m in models if m.is_recommended and m.is_enabled), None
)
if recommended:
return recommended
# Fallback to first enabled model
return next((m.slug for m in models if m.is_enabled), None)
def get_all_model_slugs_for_validation() -> list[str]:
"""
Get all model slugs for validation (enables migrate_llm_models to work).
Returns slugs for enabled models only.
"""
return [model.slug for model in _dynamic_models.values() if model.is_enabled]

View File

@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
AutoGPT Analytics — View Generator
====================================
Reads every .sql file in analytics/queries/ and registers it as a
CREATE OR REPLACE VIEW in the analytics schema.
Quick start (from autogpt_platform/backend/):
Step 1 — one-time setup (creates schema, role, grants):
poetry run analytics-setup
Step 2 — create / refresh all 14 analytics views:
poetry run analytics-views
Both commands auto-detect credentials from .env (DB_* vars).
Use --db-url to override.
Step 3 (optional) — enable login and set a password for the read-only
role so external tools (Supabase MCP, PostHog Data Warehouse) can connect.
The role is created as NOLOGIN, so you must grant LOGIN at the same time.
Run in Supabase SQL Editor:
ALTER ROLE analytics_readonly WITH LOGIN PASSWORD 'your-password';
Usage
-----
poetry run analytics-setup # apply setup to DB
poetry run analytics-setup --dry-run # print setup SQL only
poetry run analytics-views # apply all views to DB
poetry run analytics-views --dry-run # print all view SQL only
poetry run analytics-views --only graph_execution,retention_login_weekly
Environment variables
---------------------
DATABASE_URL Postgres connection string (checked before .env)
Notes
-----
- .env DB_* vars are read automatically as a fallback.
- Safe to re-run: uses CREATE OR REPLACE VIEW.
- Looker, PostHog Data Warehouse, and Supabase MCP all read from the
same analytics.* views — no raw tables exposed.
"""
import argparse
import os
import sys
from pathlib import Path
from urllib.parse import quote
QUERIES_DIR = Path(__file__).parent.parent / "analytics" / "queries"
ENV_FILE = Path(__file__).parent / ".env"
SCHEMA = "analytics"
SETUP_SQL = """\
-- =============================================================
-- AutoGPT Analytics Schema Setup
-- Run ONCE as the postgres superuser (e.g. via Supabase SQL Editor).
-- After this, run: poetry run analytics-views
-- =============================================================
-- 1. Create the analytics schema
CREATE SCHEMA IF NOT EXISTS analytics;
-- 2. Create the read-only role (skip if already exists)
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'analytics_readonly') THEN
CREATE ROLE analytics_readonly NOLOGIN;
END IF;
END
$$;
-- 3. Analytics schema grants only.
-- Views use security_invoker = false so they execute as their
-- owner (postgres). analytics_readonly never needs direct access
-- to the platform or auth schemas.
GRANT USAGE ON SCHEMA analytics TO analytics_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO analytics_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics
GRANT SELECT ON TABLES TO analytics_readonly;
"""
def load_db_url_from_env() -> str | None:
"""Read DB_* vars from .env and build a psycopg2 connection string."""
if not ENV_FILE.exists():
return None
env: dict[str, str] = {}
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
env[key.strip()] = value.strip().strip('"').strip("'")
host = env.get("DB_HOST", "localhost")
port = env.get("DB_PORT", "5432")
user = env.get("DB_USER", "postgres")
password = env.get("DB_PASS", "")
dbname = env.get("DB_NAME", "postgres")
if not password:
return None
return (
"postgresql://"
f"{quote(user, safe='')}:{quote(password, safe='')}"
f"@{host}:{port}/{quote(dbname, safe='')}"
)
def get_db_url(args: argparse.Namespace) -> str | None:
return args.db_url or os.environ.get("DATABASE_URL") or load_db_url_from_env()
def connect(db_url: str):
try:
import psycopg2
except ImportError:
print("psycopg2 not found. Run: poetry install", file=sys.stderr)
sys.exit(1)
return psycopg2.connect(db_url)
def run_sql(db_url: str, statements: list[tuple[str, str]]) -> None:
"""Execute a list of (label, sql) pairs in a single transaction."""
conn = connect(db_url)
conn.autocommit = False
cur = conn.cursor()
try:
for label, sql in statements:
print(f" {label} ...", end=" ")
cur.execute(sql)
print("OK")
conn.commit()
print(f"\n{len(statements)} statement(s) applied.")
except Exception as e:
conn.rollback()
print(f"\n✗ Error: {e}", file=sys.stderr)
sys.exit(1)
finally:
cur.close()
conn.close()
def build_view_sql(name: str, query_body: str) -> str:
body = query_body.strip().rstrip(";")
# security_invoker = false → view runs as its owner (postgres), not the
# caller, so analytics_readonly only needs analytics schema access.
return f"CREATE OR REPLACE VIEW {SCHEMA}.{name} WITH (security_invoker = false) AS\n{body};\n"
def load_views(only: list[str] | None = None) -> list[tuple[str, str]]:
"""Return [(label, sql)] for all views, in alphabetical order."""
files = sorted(QUERIES_DIR.glob("*.sql"))
if not files:
print(f"No .sql files found in {QUERIES_DIR}", file=sys.stderr)
sys.exit(1)
known = {f.stem for f in files}
if only:
unknown = [n for n in only if n not in known]
if unknown:
print(
f"Unknown view name(s): {', '.join(unknown)}\n"
f"Available: {', '.join(sorted(known))}",
file=sys.stderr,
)
sys.exit(1)
result = []
for f in files:
name = f.stem
if only and name not in only:
continue
result.append((f"view analytics.{name}", build_view_sql(name, f.read_text())))
return result
def no_db_url_error() -> None:
print(
"No database URL found.\n"
"Tried: --db-url, DATABASE_URL env var, and .env (DB_* vars).\n"
"Use --dry-run to just print the SQL.",
file=sys.stderr,
)
sys.exit(1)
def cmd_setup(args: argparse.Namespace) -> None:
if args.dry_run:
print(SETUP_SQL)
return
db_url = get_db_url(args)
if not db_url:
no_db_url_error()
assert db_url
print("Applying analytics setup...")
run_sql(db_url, [("schema / role / grants", SETUP_SQL)])
def cmd_views(args: argparse.Namespace) -> None:
only = [v.strip() for v in args.only.split(",")] if args.only else None
views = load_views(only=only)
if not views:
print("No matching views found.")
sys.exit(0)
if args.dry_run:
print(f"-- {len(views)} views\n")
for label, sql in views:
print(f"-- {label}")
print(sql)
return
db_url = get_db_url(args)
if not db_url:
no_db_url_error()
assert db_url
print(f"Applying {len(views)} view(s)...")
# Append grant refresh so the readonly role sees any new views
grant = f"GRANT SELECT ON ALL TABLES IN SCHEMA {SCHEMA} TO analytics_readonly;"
run_sql(db_url, views + [("grant analytics_readonly", grant)])
def main_setup() -> None:
parser = argparse.ArgumentParser(description="Apply analytics schema setup to DB")
parser.add_argument(
"--dry-run", action="store_true", help="Print SQL, don't execute"
)
parser.add_argument("--db-url", help="Postgres connection string")
cmd_setup(parser.parse_args())
def main_views() -> None:
parser = argparse.ArgumentParser(description="Apply analytics views to DB")
parser.add_argument(
"--dry-run", action="store_true", help="Print SQL, don't execute"
)
parser.add_argument("--db-url", help="Postgres connection string")
parser.add_argument("--only", help="Comma-separated view names to update")
cmd_views(parser.parse_args())
if __name__ == "__main__":
# Default: apply views (backwards-compatible with direct python invocation)
main_views()

View File

@@ -0,0 +1,7 @@
-- Remove GraphExecution foreign key from PendingHumanReview
-- The graphExecId column remains for querying, but we remove the FK constraint
-- to AgentGraphExecution since PendingHumanReview records can now be created
-- with synthetic graph_exec_ids (e.g., CoPilot direct block execution uses
-- "copilot-session-{session_id}" as graph_exec_id).
ALTER TABLE "PendingHumanReview" DROP CONSTRAINT IF EXISTS "PendingHumanReview_graphExecId_fkey";

View File

@@ -1,148 +0,0 @@
-- CreateEnum
CREATE TYPE "LlmCostUnit" AS ENUM ('RUN', 'TOKENS');
-- CreateTable
CREATE TABLE "LlmProvider" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"name" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"description" TEXT,
"defaultCredentialProvider" TEXT,
"defaultCredentialId" TEXT,
"defaultCredentialType" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "LlmProvider_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModelCreator" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"name" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"description" TEXT,
"websiteUrl" TEXT,
"logoUrl" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "LlmModelCreator_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModel" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"slug" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"description" TEXT,
"providerId" TEXT NOT NULL,
"creatorId" TEXT,
"contextWindow" INTEGER NOT NULL,
"maxOutputTokens" INTEGER,
"priceTier" INTEGER NOT NULL DEFAULT 1,
"isEnabled" BOOLEAN NOT NULL DEFAULT true,
"isRecommended" BOOLEAN NOT NULL DEFAULT false,
"supportsTools" BOOLEAN NOT NULL DEFAULT false,
"supportsJsonOutput" BOOLEAN NOT NULL DEFAULT false,
"supportsReasoning" BOOLEAN NOT NULL DEFAULT false,
"supportsParallelToolCalls" BOOLEAN NOT NULL DEFAULT false,
"capabilities" JSONB NOT NULL DEFAULT '{}',
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "LlmModel_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModelCost" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"unit" "LlmCostUnit" NOT NULL DEFAULT 'RUN',
"creditCost" INTEGER NOT NULL,
"credentialProvider" TEXT NOT NULL,
"credentialId" TEXT,
"credentialType" TEXT,
"currency" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
"llmModelId" TEXT NOT NULL,
CONSTRAINT "LlmModelCost_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModelMigration" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"sourceModelSlug" TEXT NOT NULL,
"targetModelSlug" TEXT NOT NULL,
"reason" TEXT,
"migratedNodeIds" JSONB NOT NULL DEFAULT '[]',
"nodeCount" INTEGER NOT NULL,
"customCreditCost" INTEGER,
"isReverted" BOOLEAN NOT NULL DEFAULT false,
"revertedAt" TIMESTAMP(3),
CONSTRAINT "LlmModelMigration_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "LlmProvider_name_key" ON "LlmProvider"("name");
-- CreateIndex
CREATE UNIQUE INDEX "LlmModelCreator_name_key" ON "LlmModelCreator"("name");
-- CreateIndex
CREATE UNIQUE INDEX "LlmModel_slug_key" ON "LlmModel"("slug");
-- CreateIndex
CREATE INDEX "LlmModel_providerId_isEnabled_idx" ON "LlmModel"("providerId", "isEnabled");
-- CreateIndex
CREATE INDEX "LlmModel_creatorId_idx" ON "LlmModel"("creatorId");
-- CreateIndex (partial unique for default costs - no specific credential)
CREATE UNIQUE INDEX "LlmModelCost_default_cost_key" ON "LlmModelCost"("llmModelId", "credentialProvider", "unit") WHERE "credentialId" IS NULL;
-- CreateIndex (partial unique for credential-specific costs)
CREATE UNIQUE INDEX "LlmModelCost_credential_cost_key" ON "LlmModelCost"("llmModelId", "credentialProvider", "credentialId", "unit") WHERE "credentialId" IS NOT NULL;
-- CreateIndex
CREATE INDEX "LlmModelMigration_targetModelSlug_idx" ON "LlmModelMigration"("targetModelSlug");
-- CreateIndex
CREATE INDEX "LlmModelMigration_sourceModelSlug_isReverted_idx" ON "LlmModelMigration"("sourceModelSlug", "isReverted");
-- CreateIndex (partial unique to prevent multiple active migrations per source)
CREATE UNIQUE INDEX "LlmModelMigration_active_source_key" ON "LlmModelMigration"("sourceModelSlug") WHERE "isReverted" = false;
-- AddForeignKey
ALTER TABLE "LlmModel" ADD CONSTRAINT "LlmModel_providerId_fkey" FOREIGN KEY ("providerId") REFERENCES "LlmProvider"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModel" ADD CONSTRAINT "LlmModel_creatorId_fkey" FOREIGN KEY ("creatorId") REFERENCES "LlmModelCreator"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModelCost" ADD CONSTRAINT "LlmModelCost_llmModelId_fkey" FOREIGN KEY ("llmModelId") REFERENCES "LlmModel"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModelMigration" ADD CONSTRAINT "LlmModelMigration_sourceModelSlug_fkey" FOREIGN KEY ("sourceModelSlug") REFERENCES "LlmModel"("slug") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModelMigration" ADD CONSTRAINT "LlmModelMigration_targetModelSlug_fkey" FOREIGN KEY ("targetModelSlug") REFERENCES "LlmModel"("slug") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddCheckConstraints (enforce data integrity)
ALTER TABLE "LlmModel"
ADD CONSTRAINT "LlmModel_priceTier_check" CHECK ("priceTier" BETWEEN 1 AND 3);
ALTER TABLE "LlmModelCost"
ADD CONSTRAINT "LlmModelCost_creditCost_check" CHECK ("creditCost" >= 0);
ALTER TABLE "LlmModelMigration"
ADD CONSTRAINT "LlmModelMigration_nodeCount_check" CHECK ("nodeCount" >= 0),
ADD CONSTRAINT "LlmModelMigration_customCreditCost_check" CHECK ("customCreditCost" IS NULL OR "customCreditCost" >= 0);

View File

@@ -120,6 +120,8 @@ ws = "backend.ws:main"
scheduler = "backend.scheduler:main"
notification = "backend.notification:main"
executor = "backend.exec:main"
analytics-setup = "generate_views:main_setup"
analytics-views = "generate_views:main_views"
copilot-executor = "backend.copilot.executor.__main__:main"
cli = "backend.cli:main"
format = "linter:format"

View File

@@ -566,8 +566,6 @@ model AgentGraphExecution {
shareToken String? @unique
sharedAt DateTime?
PendingHumanReviews PendingHumanReview[]
@@index([agentGraphId, agentGraphVersion])
@@index([userId, isDeleted, createdAt])
@@index([createdAt])
@@ -664,8 +662,7 @@ model PendingHumanReview {
updatedAt DateTime? @updatedAt
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
@@unique([nodeExecId]) // One pending review per node execution
@@index([userId, status])
@@ -1304,164 +1301,3 @@ model OAuthRefreshToken {
@@index([userId, applicationId])
@@index([expiresAt]) // For cleanup
}
// ============================================================================
// LLM Registry Models
// ============================================================================
enum LlmCostUnit {
RUN
TOKENS
}
model LlmProvider {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
name String @unique
displayName String
description String?
defaultCredentialProvider String?
defaultCredentialId String?
defaultCredentialType String?
metadata Json @default("{}")
Models LlmModel[]
}
model LlmModel {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
slug String @unique
displayName String
description String?
providerId String
Provider LlmProvider @relation(fields: [providerId], references: [id], onDelete: Restrict)
// Creator is the organization that created/trained the model (e.g., OpenAI, Meta)
// This is distinct from the provider who hosts/serves the model (e.g., OpenRouter)
creatorId String?
Creator LlmModelCreator? @relation(fields: [creatorId], references: [id], onDelete: SetNull)
contextWindow Int
maxOutputTokens Int?
priceTier Int @default(1) // 1=cheapest, 2=medium, 3=expensive (DB constraint: 1-3)
isEnabled Boolean @default(true)
isRecommended Boolean @default(false)
// Model-specific capabilities
// These vary per model even within the same provider (e.g., Hugging Face)
// Default to false for safety - partially-seeded rows should not be assumed capable
supportsTools Boolean @default(false)
supportsJsonOutput Boolean @default(false)
supportsReasoning Boolean @default(false)
supportsParallelToolCalls Boolean @default(false)
capabilities Json @default("{}")
metadata Json @default("{}")
Costs LlmModelCost[]
SourceMigrations LlmModelMigration[] @relation("SourceMigrations")
TargetMigrations LlmModelMigration[] @relation("TargetMigrations")
@@index([providerId, isEnabled])
@@index([creatorId])
// Note: slug already has @unique which creates an implicit index
}
model LlmModelCost {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
unit LlmCostUnit @default(RUN)
creditCost Int // DB constraint: >= 0
// Provider identifier (e.g., "openai", "anthropic", "openrouter")
// Used to determine which credential system provides the API key.
// Allows different pricing for:
// - Default provider costs (WHERE credentialId IS NULL)
// - User's own API key costs (WHERE credentialId IS NOT NULL)
credentialProvider String
credentialId String?
credentialType String?
currency String?
metadata Json @default("{}")
llmModelId String
Model LlmModel @relation(fields: [llmModelId], references: [id], onDelete: Cascade)
// Note: Unique constraints are implemented as partial indexes in migration SQL:
// - One for default costs (WHERE credentialId IS NULL)
// - One for credential-specific costs (WHERE credentialId IS NOT NULL)
// This allows both provider-level defaults and credential-specific overrides
}
model LlmModelCreator {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
name String @unique // e.g., "openai", "anthropic", "meta"
displayName String // e.g., "OpenAI", "Anthropic", "Meta"
description String?
websiteUrl String? // Link to creator's website
logoUrl String? // URL to creator's logo
metadata Json @default("{}")
Models LlmModel[]
}
model LlmModelMigration {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
sourceModelSlug String // The original model that was disabled
targetModelSlug String // The model workflows were migrated to
reason String? // Why the migration happened (e.g., "Provider outage")
// FK constraints ensure slugs reference valid models
SourceModel LlmModel @relation("SourceMigrations", fields: [sourceModelSlug], references: [slug], onDelete: Restrict)
TargetModel LlmModel @relation("TargetMigrations", fields: [targetModelSlug], references: [slug], onDelete: Restrict)
// Track affected nodes as JSON array of node IDs
// Format: ["node-uuid-1", "node-uuid-2", ...]
migratedNodeIds Json @default("[]")
nodeCount Int // Number of nodes migrated (DB constraint: >= 0)
// Custom pricing override for migrated workflows during the migration period.
// Use case: When migrating users from an expensive model (e.g., GPT-4) to a cheaper
// one (e.g., GPT-3.5), you may want to temporarily maintain the original pricing
// to avoid billing surprises, or offer a discount during the transition.
//
// IMPORTANT: This field is intended for integration with the billing system.
// When billing calculates costs for nodes affected by this migration, it should
// check if customCreditCost is set and use it instead of the target model's cost.
// If null, the target model's normal cost applies.
//
// TODO: Integrate with billing system to apply this override during cost calculation.
// LIMITATION: This is a simple Int and doesn't distinguish RUN vs TOKENS pricing.
// For token-priced models, this may be ambiguous. Consider migrating to a relation
// with LlmModelCost or a dedicated override model in a follow-up PR.
customCreditCost Int? // DB constraint: >= 0 when not null
// Revert tracking
isReverted Boolean @default(false)
revertedAt DateTime?
// Note: Partial unique index in migration SQL prevents multiple active migrations per source:
// UNIQUE (sourceModelSlug) WHERE isReverted = false
@@index([targetModelSlug])
@@index([sourceModelSlug, isReverted]) // Composite index for active migration queries
}

View File

@@ -1,3 +1,4 @@
import { useMemo } from "react";
import {
Conversation,
ConversationContent,
@@ -8,6 +9,7 @@ import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner
import { FileUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { TOOL_PART_PREFIX } from "../JobStatsBar/constants";
import { TurnStatsBar } from "../JobStatsBar/TurnStatsBar";
import { CopilotPendingReviews } from "../CopilotPendingReviews/CopilotPendingReviews";
import {
buildRenderSegments,
getTurnMessages,
@@ -51,6 +53,50 @@ function renderSegments(
});
}
/**
* Extract graph_exec_id from tool outputs that need review.
* Handles both:
* - run_block ReviewRequiredResponse (has graph_exec_id directly)
* - run_agent ExecutionStartedResponse with status "REVIEW" (has execution_id)
*/
function extractGraphExecId(
messages: UIMessage<unknown, UIDataTypes, UITools>[],
): string | null {
// Scan backwards — the most recent review output has the ID
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
for (const part of msg.parts) {
if ("output" in part && part.output) {
const out =
typeof part.output === "string"
? (() => {
try {
return JSON.parse(part.output);
} catch {
return null;
}
})()
: part.output;
if (out && typeof out === "object") {
// run_block: ReviewRequiredResponse has graph_exec_id
if ("graph_exec_id" in out) {
return (out as { graph_exec_id: string }).graph_exec_id;
}
// run_agent: ExecutionStartedResponse with status "REVIEW"
if (
"execution_id" in out &&
"status" in out &&
(out as { status: string }).status === "REVIEW"
) {
return (out as { execution_id: string }).execution_id;
}
}
}
}
}
return null;
}
export function ChatMessagesContainer({
messages,
status,
@@ -60,6 +106,7 @@ export function ChatMessagesContainer({
sessionID,
}: Props) {
const lastMessage = messages[messages.length - 1];
const graphExecId = useMemo(() => extractGraphExecId(messages), [messages]);
const hasInflight = (() => {
if (lastMessage?.role !== "assistant") return false;
@@ -205,6 +252,7 @@ export function ChatMessagesContainer({
</MessageContent>
</Message>
)}
{graphExecId && <CopilotPendingReviews graphExecId={graphExecId} />}
{error && (
<details className="rounded-lg bg-red-50 p-4 text-sm text-red-700">
<summary className="cursor-pointer font-medium">

View File

@@ -130,6 +130,7 @@ export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
case "tool-get_doc_page":
return <SearchDocsTool key={key} part={part as ToolUIPart} />;
case "tool-run_block":
case "tool-continue_run_block":
return <RunBlockTool key={key} part={part as ToolUIPart} />;
case "tool-run_mcp_tool":
return <RunMCPToolComponent key={key} part={part as ToolUIPart} />;

View File

@@ -19,6 +19,7 @@ const CUSTOM_TOOL_TYPES = new Set([
"tool-search_docs",
"tool-get_doc_page",
"tool-run_block",
"tool-continue_run_block",
"tool-run_mcp_tool",
"tool-run_agent",
"tool-schedule_agent",
@@ -33,6 +34,7 @@ const INTERACTIVE_RESPONSE_TYPES: ReadonlySet<string> = new Set([
ResponseType.setup_requirements,
ResponseType.agent_details,
ResponseType.block_details,
ResponseType.review_required,
ResponseType.need_login,
ResponseType.input_validation_error,
ResponseType.agent_builder_clarification_needed,

View File

@@ -0,0 +1,61 @@
"use client";
import { useCallback } from "react";
import { PendingReviewsList } from "@/components/organisms/PendingReviewsList/PendingReviewsList";
import { useCopilotChatActions } from "../CopilotChatActionsProvider/useCopilotChatActions";
import { usePendingReviewsForExecution } from "@/hooks/usePendingReviews";
import { okData } from "@/app/api/helpers";
interface Props {
graphExecId: string;
}
/**
* Renders a single consolidated PendingReviewsList for all pending copilot
* reviews in a session — mirrors the non-copilot review page behavior.
* Works for both run_block (synthetic copilot-session-*) and run_agent (real graph exec) reviews.
*/
export function CopilotPendingReviews({ graphExecId }: Props) {
const { onSend } = useCopilotChatActions();
const { pendingReviews, refetch } = usePendingReviewsForExecution(
graphExecId,
{ enabled: !!graphExecId, refetchInterval: 2000 },
);
// Graph executions auto-resume after approval; block reviews need continue_run_block.
const isGraphExecution = !graphExecId.startsWith("copilot-session-");
const handleReviewComplete = useCallback(async () => {
// Brief delay for the server to propagate the approval
await new Promise((resolve) => setTimeout(resolve, 500));
const result = await refetch();
const remaining = okData(result.data) || [];
if (remaining.length > 0) return;
if (isGraphExecution) {
onSend(
`All pending reviews have been processed. ` +
`The agent execution will resume automatically for approved reviews. ` +
`Use view_agent_output with execution_id="${graphExecId}" to check the result.`,
);
} else {
onSend(
`All pending reviews have been processed. ` +
`For any approved reviews, call continue_run_block with the corresponding review_id to execute them. ` +
`For rejected reviews, no further action is needed.`,
);
}
}, [refetch, onSend, isGraphExecution, graphExecId]);
if (pendingReviews.length === 0) return null;
return (
<div className="py-2">
<PendingReviewsList
reviews={pendingReviews}
onReviewComplete={handleReviewComplete}
/>
</div>
);
}

View File

@@ -15,6 +15,7 @@ import {
isRunBlockBlockOutput,
isRunBlockDetailsOutput,
isRunBlockErrorOutput,
isRunBlockReviewRequiredOutput,
isRunBlockSetupRequirementsOutput,
ToolIcon,
} from "./helpers";
@@ -54,10 +55,15 @@ export function RunBlockTool({ part }: Props) {
part.state === "output-available" &&
!!output &&
!setupRequirementsOutput &&
!isRunBlockReviewRequiredOutput(output) &&
(isRunBlockBlockOutput(output) ||
isRunBlockDetailsOutput(output) ||
isRunBlockErrorOutput(output));
// Review UI is rendered at the chat level by CopilotPendingReviews,
// not inside each tool card. This matches the non-copilot flow where
// a single PendingReviewsList shows all reviews grouped together.
return (
<div className="py-2">
<div className="flex items-center gap-2 text-sm text-muted-foreground">

View File

@@ -26,6 +26,18 @@ export interface BlockDetailsResponse {
user_authenticated: boolean;
}
/** Response when a block requires human review before execution. */
export interface ReviewRequiredResponse {
type: typeof ResponseType.review_required;
message: string;
session_id?: string | null;
block_id: string;
block_name: string;
review_id: string;
graph_exec_id: string;
input_data: Record<string, unknown>;
}
export interface RunBlockInput {
block_id?: string;
block_name?: string;
@@ -36,12 +48,14 @@ export type RunBlockToolOutput =
| SetupRequirementsResponse
| BlockDetailsResponse
| BlockOutputResponse
| ReviewRequiredResponse
| ErrorResponse;
const RUN_BLOCK_OUTPUT_TYPES = new Set<string>([
ResponseType.setup_requirements,
ResponseType.block_details,
ResponseType.block_output,
ResponseType.review_required,
ResponseType.error,
]);
@@ -66,7 +80,19 @@ export function isRunBlockDetailsOutput(
export function isRunBlockBlockOutput(
output: RunBlockToolOutput,
): output is BlockOutputResponse {
return output.type === ResponseType.block_output || "block_id" in output;
return (
output.type === ResponseType.block_output ||
("block_id" in output && !("review_id" in output))
);
}
export function isRunBlockReviewRequiredOutput(
output: RunBlockToolOutput,
): output is ReviewRequiredResponse {
return (
output.type === ResponseType.review_required ||
("review_id" in output && "block_name" in output && "input_data" in output)
);
}
export function isRunBlockErrorOutput(
@@ -91,6 +117,7 @@ function parseOutput(output: unknown): RunBlockToolOutput | null {
if (typeof type === "string" && RUN_BLOCK_OUTPUT_TYPES.has(type)) {
return output as RunBlockToolOutput;
}
if ("review_id" in output) return output as ReviewRequiredResponse;
if ("block_id" in output) return output as BlockOutputResponse;
if ("block" in output) return output as BlockDetailsResponse;
if ("setup_info" in output) return output as SetupRequirementsResponse;
@@ -135,6 +162,9 @@ export function getAnimationText(part: {
if (isRunBlockSetupRequirementsOutput(output)) {
return `Setup needed for "${output.setup_info.agent_name}"`;
}
if (isRunBlockReviewRequiredOutput(output)) {
return `Review needed for "${output.block_name}"`;
}
return "Error running block";
}
case "output-error":
@@ -227,6 +257,14 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
};
}
if (isRunBlockReviewRequiredOutput(output)) {
return {
icon,
title: output.block_name,
description: "Sensitive action — awaiting review",
};
}
return {
icon: (
<WarningDiamondIcon size={32} weight="light" className="text-red-500" />

View File

@@ -9,52 +9,52 @@
/**
* Types of tool responses.
*/
export type ResponseType = typeof ResponseType[keyof typeof ResponseType];
export type ResponseType = (typeof ResponseType)[keyof typeof ResponseType];
// eslint-disable-next-line @typescript-eslint/no-redeclare
export const ResponseType = {
error: 'error',
no_results: 'no_results',
need_login: 'need_login',
agents_found: 'agents_found',
agent_details: 'agent_details',
setup_requirements: 'setup_requirements',
input_validation_error: 'input_validation_error',
execution_started: 'execution_started',
agent_output: 'agent_output',
understanding_updated: 'understanding_updated',
suggested_goal: 'suggested_goal',
agent_builder_guide: 'agent_builder_guide',
agent_builder_preview: 'agent_builder_preview',
agent_builder_saved: 'agent_builder_saved',
agent_builder_clarification_needed: 'agent_builder_clarification_needed',
agent_builder_validation_result: 'agent_builder_validation_result',
agent_builder_fix_result: 'agent_builder_fix_result',
block_list: 'block_list',
block_details: 'block_details',
block_output: 'block_output',
mcp_guide: 'mcp_guide',
mcp_tools_discovered: 'mcp_tools_discovered',
mcp_tool_output: 'mcp_tool_output',
doc_search_results: 'doc_search_results',
doc_page: 'doc_page',
workspace_file_list: 'workspace_file_list',
workspace_file_content: 'workspace_file_content',
workspace_file_metadata: 'workspace_file_metadata',
workspace_file_written: 'workspace_file_written',
workspace_file_deleted: 'workspace_file_deleted',
folder_created: 'folder_created',
folder_list: 'folder_list',
folder_updated: 'folder_updated',
folder_moved: 'folder_moved',
folder_deleted: 'folder_deleted',
agents_moved_to_folder: 'agents_moved_to_folder',
browser_navigate: 'browser_navigate',
browser_act: 'browser_act',
browser_screenshot: 'browser_screenshot',
bash_exec: 'bash_exec',
web_fetch: 'web_fetch',
feature_request_search: 'feature_request_search',
feature_request_created: 'feature_request_created',
error: "error",
no_results: "no_results",
need_login: "need_login",
agents_found: "agents_found",
agent_details: "agent_details",
setup_requirements: "setup_requirements",
input_validation_error: "input_validation_error",
execution_started: "execution_started",
agent_output: "agent_output",
understanding_updated: "understanding_updated",
suggested_goal: "suggested_goal",
agent_builder_guide: "agent_builder_guide",
agent_builder_preview: "agent_builder_preview",
agent_builder_saved: "agent_builder_saved",
agent_builder_clarification_needed: "agent_builder_clarification_needed",
agent_builder_validation_result: "agent_builder_validation_result",
agent_builder_fix_result: "agent_builder_fix_result",
block_list: "block_list",
block_details: "block_details",
block_output: "block_output",
review_required: "review_required",
mcp_guide: "mcp_guide",
mcp_tools_discovered: "mcp_tools_discovered",
mcp_tool_output: "mcp_tool_output",
doc_search_results: "doc_search_results",
doc_page: "doc_page",
workspace_file_list: "workspace_file_list",
workspace_file_content: "workspace_file_content",
workspace_file_metadata: "workspace_file_metadata",
workspace_file_written: "workspace_file_written",
workspace_file_deleted: "workspace_file_deleted",
folder_created: "folder_created",
folder_list: "folder_list",
folder_updated: "folder_updated",
folder_moved: "folder_moved",
folder_deleted: "folder_deleted",
agents_moved_to_folder: "agents_moved_to_folder",
browser_navigate: "browser_navigate",
browser_act: "browser_act",
browser_screenshot: "browser_screenshot",
bash_exec: "bash_exec",
web_fetch: "web_fetch",
feature_request_search: "feature_request_search",
feature_request_created: "feature_request_created",
} as const;

View File

@@ -11557,6 +11557,7 @@
"block_list",
"block_details",
"block_output",
"review_required",
"mcp_guide",
"mcp_tools_discovered",
"mcp_tool_output",

View File

@@ -57,27 +57,25 @@ async function handleWorkspaceDownload(
);
}
// Get the content type from the backend response
// Fully buffer the response before forwarding. Passing response.body as a
// ReadableStream causes silent truncation in Next.js / Vercel — the last
// ~10 KB of larger files are dropped, corrupting PNGs and truncating CSVs.
const buffer = await response.arrayBuffer();
const contentType =
response.headers.get("Content-Type") || "application/octet-stream";
const contentDisposition = response.headers.get("Content-Disposition");
// Stream the response body
const responseHeaders: Record<string, string> = {
"Content-Type": contentType,
"Content-Length": String(buffer.byteLength),
};
if (contentDisposition) {
responseHeaders["Content-Disposition"] = contentDisposition;
}
const contentLength = response.headers.get("Content-Length");
if (contentLength) {
responseHeaders["Content-Length"] = contentLength;
}
// Stream the response body directly instead of buffering in memory
return new NextResponse(response.body, {
return new NextResponse(buffer, {
status: 200,
headers: responseHeaders,
});