Compare commits

..

22 Commits

Author SHA1 Message Date
Bentlybro
957ec038b8 chore: regenerate OpenAPI schema for new LLM endpoints 2026-03-16 15:44:53 +00:00
Bentlybro
9b39a662ee feat(platform): Add LLM registry public read API
Implements public GET endpoints for querying LLM models and providers - Part 3 of 6 in the incremental registry rollout.

**Endpoints:**
- GET /api/llm/models - List all models (filterable by enabled_only)
- GET /api/llm/providers - List providers with their models

**Design:**
- Uses in-memory registry from PR 2 (no DB queries)
- Fast reads from cache populated at startup
- Grouped by provider for easy UI rendering

**Response models:**
- LlmModel - model info with capabilities, costs, creator
- LlmProvider - provider with nested models
- LlmModelsResponse - list + total count
- LlmProvidersResponse - grouped by provider

**Authentication:**
- Requires user auth (requires_user dependency)
- Public within authenticated sessions

**Integration:**
- Registered in rest_api.py at /api prefix
- Tagged with v2 + llm for OpenAPI grouping

**What's NOT included (later PRs):**
- Admin write API (PR 4)
- Block integration (PR 5)
- Redis cache (PR 6)

Lines: ~180 total
Files: 4 (3 new, 1 modified)
Review time: < 10 minutes
2026-03-16 15:44:53 +00:00
Bentlybro
9b93a956b4 style: fix trailing whitespace in registry.py 2026-03-16 15:44:45 +00:00
Bentlybro
b236719bbf fix(startup): handle missing AgentNode table in migrate_llm_models
Tests fail with 'relation "platform.AgentNode" does not exist' because
migrate_llm_models() runs during startup and queries a table that doesn't
exist in fresh test databases.

This is an existing bug in the codebase - the function has no error handling.

Wrap the call in try/except to gracefully handle test environments where
the AgentNode table hasn't been created yet.
2026-03-16 15:12:25 +00:00
Bentlybro
4f286f510f refactor: address CodeRabbit/Majdyz review feedback
- Fix ModelMetadata duplicate type collision by importing from blocks.llm
- Remove _json_to_dict helper, use dict() inline
- Add warning when Provider relation is missing (data corruption indicator)
- Optimize get_default_model_slug with next() (single sort pass)
- Optimize _build_schema_options to use list comprehension
- Move llm_registry import to top-level in rest_api.py
- Ensure max_output_tokens falls back to context_window when null

All critical and quick-win issues addressed.
2026-03-16 14:55:39 +00:00
Bentlybro
b1595d871d fix: address Sentry/CodeRabbit critical and major issues
**CRITICAL FIX - ModelMetadata instantiation:**
- Removed non-existent 'supports_vision' argument
- Added required fields: display_name, provider_name, creator_name, price_tier
- Handle nullable DB fields (Creator, priceTier, maxOutputTokens) safely
- Fallback: creator_name='Unknown' if no Creator, price_tier=1 if invalid

**MAJOR FIX - Preserve pricing unit:**
- Added 'unit' field to RegistryModelCost dataclass
- Prevents RUN vs TOKENS ambiguity in cached costs
- Convert Prisma enum to string when building cost objects

**MAJOR FIX - Deterministic default model:**
- Sort recommended models by display_name before selection
- Prevents non-deterministic results when multiple models are recommended
- Ensures consistent default across refreshes

**STARTUP IMPROVEMENT:**
- Added comment: graceful fallback OK for now (no blocks use registry yet)
- Will be stricter in PR #5 when block integration lands
- Added success log message for registry refresh

Fixes identified by Sentry (critical TypeError) and CodeRabbit review.
2026-03-16 14:55:39 +00:00
Bentlybro
29ab7f2d9c feat(platform): Add LLM registry core - DB layer + in-memory cache
Implements the registry core for dynamic LLM model management:

**DB Layer:**
- Fetch models with provider, costs, and creator relations
- Prisma query with includes for related data
- Convert DB records to typed dataclasses

**In-memory Cache:**
- Global dict for fast model lookups
- Atomic cache refresh with lock protection
- Schema options generation for UI dropdowns

**Public API:**
- get_model(slug) - lookup by slug
- get_all_models() - all models (including disabled)
- get_enabled_models() - enabled models only
- get_schema_options() - UI dropdown data
- get_default_model_slug() - recommended or first enabled
- refresh_llm_registry() - manual refresh trigger

**Integration:**
- Refresh at API startup (before block init)
- Graceful fallback if registry unavailable
- Enables blocks to consume registry data

**Models:**
- RegistryModel - full model with metadata
- RegistryModelCost - pricing configuration
- RegistryModelCreator - model creator info
- ModelMetadata - context window, capabilities

**Next PRs:**
- PR #3: Public read API (GET endpoints)
- PR #4: Admin write API (POST/PATCH/DELETE)
- PR #5: Block integration (update LLM block)
- PR #6: Redis cache (solve thundering herd)

Lines: ~230 (registry.py ~210, __init__.py ~30, model.py from draft)
Files: 4 (3 new, 1 modified)
2026-03-16 14:55:39 +00:00
Bentlybro
784936b323 revert: undo changes to graph.py
Reverting migrate_llm_models modifications per request.
Back to dev baseline for this file.
2026-03-16 14:55:39 +00:00
Bentlybro
f2ae38a1a7 fix(schema): address Majdyz review feedback
- Add FK constraints on LlmModelMigration (sourceModelSlug, targetModelSlug → LlmModel.slug)
- Remove unused @@index([credentialProvider]) on LlmModelCost
- Remove redundant @@index([isReverted]) on LlmModelMigration (covered by composite)
- Add documentation for credentialProvider field explaining its purpose
- Add reverse relation fields to LlmModel (SourceMigrations, TargetMigrations)

Fixes data integrity: typos in migration slugs now caught at DB level.
2026-03-16 14:52:19 +00:00
Bently
2ccfb4e4c1 Merge branch 'dev' into feat/llm-registry-schema 2026-03-10 17:52:01 +00:00
Bentlybro
c65e5c957a fix: isort import order 2026-03-10 16:43:34 +00:00
Bentlybro
54355a691b fix: use execute_raw_with_schema for proper multi-schema support
Per Sentry feedback: db.execute_raw ignores connection string's ?schema=
parameter and defaults to 'public' schema. This breaks in multi-schema setups.

Changes:
- Import execute_raw_with_schema from .db
- Use {schema_prefix} placeholder in query
- Call execute_raw_with_schema instead of db.execute_raw

This matches the pattern used in fix_llm_provider_credentials and other
schema-aware migrations. Works in both CI (public schema) and local
(platform schema from connection string).
2026-03-10 16:25:12 +00:00
Bentlybro
3cafa49c4c fix: remove hardcoded schema prefix from migrate_llm_models query
The raw SQL query in migrate_llm_models() hardcoded platform."AgentNode"
which fails in CI where tables are in 'public' schema (not 'platform').

This code exists in dev but only runs when LLM registry has data. With our
new schema, the migration tries to run at startup and fails in CI.

Changed: UPDATE platform."AgentNode" -> UPDATE "AgentNode"

Matches pattern of all other migrations - let connection string's default
schema handle routing.
2026-03-10 16:19:57 +00:00
Bentlybro
ded002a406 fix: remove CREATE SCHEMA to match CI environment
CI uses schema "public" as default (not "platform"), so creating
a platform schema then tables without prefix puts tables in public
but Prisma looks in platform.

Existing migrations don't create schema - they rely on connection
string's default. Remove CREATE SCHEMA IF NOT EXISTS to match.
2026-03-10 15:57:26 +00:00
Bentlybro
4fdf89c3be fix: remove schema prefix from migration SQL to match existing pattern
CI failing with 'relation "platform.AgentNode" does not exist' because
Prisma generates queries differently when tables are created with
explicit schema prefixes.

Existing AutoGPT migrations use:
  CREATE TABLE "AgentNode" (...)

Not:
  CREATE TABLE "platform"."AgentNode" (...)

The connection string's ?schema=platform handles schema selection,
so explicit prefixes aren't needed and cause compatibility issues.

Changes:
- Remove all "platform". prefixes from:
  * CREATE TYPE statements
  * CREATE TABLE statements
  * CREATE INDEX statements
  * ALTER TABLE statements
  * REFERENCES clauses in foreign keys

Now matches existing migration pattern exactly.
2026-03-10 15:38:41 +00:00
Bentlybro
d816bd739f fix: add partial unique indexes for data integrity
Per CodeRabbit feedback - fix 2 actual bugs:

1. Prevent multiple active migrations per source model
   - Add partial unique index: UNIQUE (sourceModelSlug) WHERE isReverted = false
   - Prevents ambiguous routing when resolving migrations

2. Allow both default and credential-specific costs
   - Remove @@unique([llmModelId, credentialProvider, unit])
   - Add 2 partial unique indexes:
     * UNIQUE (llmModelId, provider, unit) WHERE credentialId IS NULL (defaults)
     * UNIQUE (llmModelId, provider, credentialId, unit) WHERE credentialId IS NOT NULL (overrides)
   - Enables provider-level default costs + per-credential overrides

Schema comments document that these constraints exist in migration SQL.
2026-03-10 15:08:44 +00:00
Bentlybro
6a16376323 fix: remove multiSchema - follow existing AutoGPT pattern
Remove unnecessary multiSchema configuration that broke existing models.

AutoGPT uses connection string's ?schema=platform parameter as default,
not Prisma's multiSchema feature. Existing models (User, AgentGraph, etc.)
have no @@schema() directives and work fine.

Changes:
- Remove schemas = ["platform", "public"] from datasource
- Remove "multiSchema" from previewFeatures
- Remove all @@schema() directives from LLM models and enum

Migration SQL already creates tables in platform schema explicitly
(CREATE TABLE "platform"."LlmProvider" etc.) which is correct.

This matches the existing pattern used throughout the codebase.
2026-03-10 14:49:23 +00:00
Bentlybro
ed7b02ffb1 fix: address CodeRabbit design feedback
Per CodeRabbit review:

1. **Safety: Change capability defaults false → safer for partial seeding**
   - supportsTools: true → false
   - supportsJsonOutput: true → false
   - Prevents partially-seeded rows from being assumed capable

2. **Clarity: Rename supportsParallelTool → supportsParallelToolCalls**
   - More explicit about what the field represents

3. **Performance: Remove redundant indexes**
   - Drop @@index([llmModelId]) - covered by unique constraint
   - Drop @@index([sourceModelSlug]) - covered by composite index
   - Reduces write overhead and storage

4. **Documentation: Acknowledge customCreditCost limitation**
   - It's unit-agnostic (doesn't distinguish RUN vs TOKENS)
   - Noted as TODO for follow-up PR with proper unit-aware override

Schema + migration both updated to match.
2026-03-10 14:27:42 +00:00
Bentlybro
d064198dd1 fix: add @@schema("platform") to LlmCostUnit enum
Sentry caught this - enums also need @@schema directive with multiSchema enabled.
Without it, Prisma looks for enum in public schema but it's created in platform.
2026-03-10 14:23:24 +00:00
Bentlybro
01ad033b2b feat: add database CHECK constraints for data integrity
Per CodeRabbit feedback - enforce numeric domain rules at DB level:

Migration:
- priceTier: CHECK (priceTier BETWEEN 1 AND 3)
- creditCost: CHECK (creditCost >= 0)
- nodeCount: CHECK (nodeCount >= 0)
- customCreditCost: CHECK (customCreditCost IS NULL OR customCreditCost >= 0)

Schema comments:
- Document constraints inline for developer visibility

Prevents invalid data (negative costs, out-of-range tiers) from
entering the database, matching backend/blocks/llm.py contract.
2026-03-10 14:20:07 +00:00
Bentlybro
56bcbda054 fix: use @@schema() instead of @@map() for platform schema + create schema in migration
Critical fixes from PR review:

1. Replace @@map("platform.ModelName") with @@schema("platform")
   - Sentry correctly identified: Prisma was looking for literal table "platform.LlmProvider" with dot
   - Proper syntax: enable multiSchema feature + use @@schema directive

2. Create platform schema in migration
   - CI failed: schema "platform" does not exist
   - Add CREATE SCHEMA IF NOT EXISTS at start of migration

Schema changes:
- datasource: add schemas = ["platform", "public"]
- generator: add "multiSchema" to previewFeatures
- All 5 models: @@map() → @@schema("platform")

Migration changes:
- Add CREATE SCHEMA IF NOT EXISTS "platform" before enum creation

Fixes CI failure and Sentry-identified bug.
2026-03-10 14:15:35 +00:00
Bentlybro
d40efc6056 feat(platform): Add LLM registry database schema
Add Prisma schema and migration for dynamic LLM model registry:

Schema additions:
- LlmProvider: Registry of LLM providers (OpenAI, Anthropic, etc.)
- LlmModel: Individual models with capabilities and metadata
- LlmModelCost: Per-model pricing configuration
- LlmModelCreator: Model creators/trainers (OpenAI, Meta, etc.)
- LlmModelMigration: Track model migrations and reverts
- LlmCostUnit enum: RUN vs TOKENS pricing units

Key features:
- Model-specific capabilities (tools, JSON, reasoning, parallel calls)
- Flexible creator/provider separation (e.g., Meta model via Hugging Face)
- Migration tracking with custom pricing overrides
- Indexes for performance on common queries

Part 1 of incremental LLM registry implementation.
Refs: Draft PR #11699
2026-03-10 13:22:05 +00:00
53 changed files with 10063 additions and 4684 deletions

View File

@@ -1,40 +0,0 @@
-- =============================================================
-- View: analytics.auth_activities
-- Looker source alias: ds49 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Tracks authentication events (login, logout, SSO, password
-- reset, etc.) from Supabase's internal audit log.
-- Useful for monitoring sign-in patterns and detecting anomalies.
--
-- SOURCE TABLES
-- auth.audit_log_entries — Supabase internal auth event log
--
-- OUTPUT COLUMNS
-- created_at TIMESTAMPTZ When the auth event occurred
-- actor_id TEXT User ID who triggered the event
-- actor_via_sso TEXT Whether the action was via SSO ('true'/'false')
-- action TEXT Event type (e.g. 'login', 'logout', 'token_refreshed')
--
-- WINDOW
-- Rolling 90 days from current date
--
-- EXAMPLE QUERIES
-- -- Daily login counts
-- SELECT DATE_TRUNC('day', created_at) AS day, COUNT(*) AS logins
-- FROM analytics.auth_activities
-- WHERE action = 'login'
-- GROUP BY 1 ORDER BY 1;
--
-- -- SSO vs password login breakdown
-- SELECT actor_via_sso, COUNT(*) FROM analytics.auth_activities
-- WHERE action = 'login' GROUP BY 1;
-- =============================================================
SELECT
created_at,
payload->>'actor_id' AS actor_id,
payload->>'actor_via_sso' AS actor_via_sso,
payload->>'action' AS action
FROM auth.audit_log_entries
WHERE created_at >= NOW() - INTERVAL '90 days'

View File

@@ -1,105 +0,0 @@
-- =============================================================
-- View: analytics.graph_execution
-- Looker source alias: ds16 | Charts: 21
-- =============================================================
-- DESCRIPTION
-- One row per agent graph execution (last 90 days).
-- Unpacks the JSONB stats column into individual numeric columns
-- and normalises the executionStatus — runs that failed due to
-- insufficient credits are reclassified as 'NO_CREDITS' for
-- easier filtering. Error messages are scrubbed of IDs and URLs
-- to allow safe grouping.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
-- platform.AgentGraph — Agent graph metadata (for name)
-- platform.LibraryAgent — To flag possibly-AI (safe-mode) agents
--
-- OUTPUT COLUMNS
-- id TEXT Execution UUID
-- agentGraphId TEXT Agent graph UUID
-- agentGraphVersion INT Graph version number
-- executionStatus TEXT COMPLETED | FAILED | NO_CREDITS | RUNNING | QUEUED | TERMINATED
-- createdAt TIMESTAMPTZ When the execution was queued
-- updatedAt TIMESTAMPTZ Last status update time
-- userId TEXT Owner user UUID
-- agentGraphName TEXT Human-readable agent name
-- cputime DECIMAL Total CPU seconds consumed
-- walltime DECIMAL Total wall-clock seconds
-- node_count DECIMAL Number of nodes in the graph
-- nodes_cputime DECIMAL CPU time across all nodes
-- nodes_walltime DECIMAL Wall time across all nodes
-- execution_cost DECIMAL Credit cost of this execution
-- correctness_score FLOAT AI correctness score (if available)
-- possibly_ai BOOLEAN True if agent has sensitive_action_safe_mode enabled
-- groupedErrorMessage TEXT Scrubbed error string (IDs/URLs replaced with wildcards)
--
-- WINDOW
-- Rolling 90 days (createdAt > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Daily execution counts by status
-- SELECT DATE_TRUNC('day', "createdAt") AS day, "executionStatus", COUNT(*)
-- FROM analytics.graph_execution
-- GROUP BY 1, 2 ORDER BY 1;
--
-- -- Average cost per execution by agent
-- SELECT "agentGraphName", AVG("execution_cost") AS avg_cost, COUNT(*) AS runs
-- FROM analytics.graph_execution
-- WHERE "executionStatus" = 'COMPLETED'
-- GROUP BY 1 ORDER BY avg_cost DESC;
--
-- -- Top error messages
-- SELECT "groupedErrorMessage", COUNT(*) AS occurrences
-- FROM analytics.graph_execution
-- WHERE "executionStatus" = 'FAILED'
-- GROUP BY 1 ORDER BY 2 DESC LIMIT 20;
-- =============================================================
SELECT
ge."id" AS id,
ge."agentGraphId" AS agentGraphId,
ge."agentGraphVersion" AS agentGraphVersion,
CASE
WHEN jsonb_exists(ge."stats"::jsonb, 'error')
AND (
(ge."stats"::jsonb->>'error') ILIKE '%insufficient balance%'
OR (ge."stats"::jsonb->>'error') ILIKE '%you have no credits left%'
)
THEN 'NO_CREDITS'
ELSE CAST(ge."executionStatus" AS TEXT)
END AS executionStatus,
ge."createdAt" AS createdAt,
ge."updatedAt" AS updatedAt,
ge."userId" AS userId,
g."name" AS agentGraphName,
(ge."stats"::jsonb->>'cputime')::decimal AS cputime,
(ge."stats"::jsonb->>'walltime')::decimal AS walltime,
(ge."stats"::jsonb->>'node_count')::decimal AS node_count,
(ge."stats"::jsonb->>'nodes_cputime')::decimal AS nodes_cputime,
(ge."stats"::jsonb->>'nodes_walltime')::decimal AS nodes_walltime,
(ge."stats"::jsonb->>'cost')::decimal AS execution_cost,
(ge."stats"::jsonb->>'correctness_score')::float AS correctness_score,
COALESCE(la.possibly_ai, FALSE) AS possibly_ai,
REGEXP_REPLACE(
REGEXP_REPLACE(
TRIM(BOTH '"' FROM ge."stats"::jsonb->>'error'),
'(https?://)([A-Za-z0-9.-]+)(:[0-9]+)?(/[^\s]*)?',
'\1\2/...', 'gi'
),
'[a-zA-Z0-9_:-]*\d[a-zA-Z0-9_:-]*', '*', 'g'
) AS groupedErrorMessage
FROM platform."AgentGraphExecution" ge
LEFT JOIN platform."AgentGraph" g
ON ge."agentGraphId" = g."id"
AND ge."agentGraphVersion" = g."version"
LEFT JOIN (
SELECT DISTINCT ON ("userId", "agentGraphId")
"userId", "agentGraphId",
("settings"::jsonb->>'sensitive_action_safe_mode')::boolean AS possibly_ai
FROM platform."LibraryAgent"
WHERE "isDeleted" = FALSE
AND "isArchived" = FALSE
ORDER BY "userId", "agentGraphId", "agentGraphVersion" DESC
) la ON la."userId" = ge."userId" AND la."agentGraphId" = ge."agentGraphId"
WHERE ge."createdAt" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -1,101 +0,0 @@
-- =============================================================
-- View: analytics.node_block_execution
-- Looker source alias: ds14 | Charts: 11
-- =============================================================
-- DESCRIPTION
-- One row per node (block) execution (last 90 days).
-- Unpacks stats JSONB and joins to identify which block type
-- was run. For failed nodes, joins the error output and
-- scrubs it for safe grouping.
--
-- SOURCE TABLES
-- platform.AgentNodeExecution — Node execution records
-- platform.AgentNode — Node → block mapping
-- platform.AgentBlock — Block name/ID
-- platform.AgentNodeExecutionInputOutput — Error output values
--
-- OUTPUT COLUMNS
-- id TEXT Node execution UUID
-- agentGraphExecutionId TEXT Parent graph execution UUID
-- agentNodeId TEXT Node UUID within the graph
-- executionStatus TEXT COMPLETED | FAILED | QUEUED | RUNNING | TERMINATED
-- addedTime TIMESTAMPTZ When the node was queued
-- queuedTime TIMESTAMPTZ When it entered the queue
-- startedTime TIMESTAMPTZ When execution started
-- endedTime TIMESTAMPTZ When execution finished
-- inputSize BIGINT Input payload size in bytes
-- outputSize BIGINT Output payload size in bytes
-- walltime NUMERIC Wall-clock seconds for this node
-- cputime NUMERIC CPU seconds for this node
-- llmRetryCount INT Number of LLM retries
-- llmCallCount INT Number of LLM API calls made
-- inputTokenCount BIGINT LLM input tokens consumed
-- outputTokenCount BIGINT LLM output tokens produced
-- blockName TEXT Human-readable block name (e.g. 'OpenAIBlock')
-- blockId TEXT Block UUID
-- groupedErrorMessage TEXT Scrubbed error (IDs/URLs wildcarded)
-- errorMessage TEXT Raw error output (only set when FAILED)
--
-- WINDOW
-- Rolling 90 days (addedTime > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Most-used blocks by execution count
-- SELECT "blockName", COUNT(*) AS executions,
-- COUNT(*) FILTER (WHERE "executionStatus"='FAILED') AS failures
-- FROM analytics.node_block_execution
-- GROUP BY 1 ORDER BY executions DESC LIMIT 20;
--
-- -- Average LLM token usage per block
-- SELECT "blockName",
-- AVG("inputTokenCount") AS avg_input_tokens,
-- AVG("outputTokenCount") AS avg_output_tokens
-- FROM analytics.node_block_execution
-- WHERE "llmCallCount" > 0
-- GROUP BY 1 ORDER BY avg_input_tokens DESC;
--
-- -- Top failure reasons
-- SELECT "blockName", "groupedErrorMessage", COUNT(*) AS count
-- FROM analytics.node_block_execution
-- WHERE "executionStatus" = 'FAILED'
-- GROUP BY 1, 2 ORDER BY count DESC LIMIT 20;
-- =============================================================
SELECT
ne."id" AS id,
ne."agentGraphExecutionId" AS agentGraphExecutionId,
ne."agentNodeId" AS agentNodeId,
CAST(ne."executionStatus" AS TEXT) AS executionStatus,
ne."addedTime" AS addedTime,
ne."queuedTime" AS queuedTime,
ne."startedTime" AS startedTime,
ne."endedTime" AS endedTime,
(ne."stats"::jsonb->>'input_size')::bigint AS inputSize,
(ne."stats"::jsonb->>'output_size')::bigint AS outputSize,
(ne."stats"::jsonb->>'walltime')::numeric AS walltime,
(ne."stats"::jsonb->>'cputime')::numeric AS cputime,
(ne."stats"::jsonb->>'llm_retry_count')::int AS llmRetryCount,
(ne."stats"::jsonb->>'llm_call_count')::int AS llmCallCount,
(ne."stats"::jsonb->>'input_token_count')::bigint AS inputTokenCount,
(ne."stats"::jsonb->>'output_token_count')::bigint AS outputTokenCount,
b."name" AS blockName,
b."id" AS blockId,
REGEXP_REPLACE(
REGEXP_REPLACE(
TRIM(BOTH '"' FROM eio."data"::text),
'(https?://)([A-Za-z0-9.-]+)(:[0-9]+)?(/[^\s]*)?',
'\1\2/...', 'gi'
),
'[a-zA-Z0-9_:-]*\d[a-zA-Z0-9_:-]*', '*', 'g'
) AS groupedErrorMessage,
eio."data" AS errorMessage
FROM platform."AgentNodeExecution" ne
LEFT JOIN platform."AgentNode" nd
ON ne."agentNodeId" = nd."id"
LEFT JOIN platform."AgentBlock" b
ON nd."agentBlockId" = b."id"
LEFT JOIN platform."AgentNodeExecutionInputOutput" eio
ON eio."referencedByOutputExecId" = ne."id"
AND eio."name" = 'error'
AND ne."executionStatus" = 'FAILED'
WHERE ne."addedTime" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -1,97 +0,0 @@
-- =============================================================
-- View: analytics.retention_agent
-- Looker source alias: ds35 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention broken down per individual agent.
-- Cohort = week of a user's first use of THAT specific agent.
-- Tells you which agents keep users coming back vs. one-shot
-- use. Only includes cohorts from the last 180 days.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records (user × agent × time)
-- platform.AgentGraph — Agent names
--
-- OUTPUT COLUMNS
-- agent_id TEXT Agent graph UUID
-- agent_label TEXT 'AgentName [first8chars]'
-- agent_label_n TEXT 'AgentName [first8chars] (n=total_users)'
-- cohort_week_start DATE Week users first ran this agent
-- cohort_label TEXT ISO week label
-- cohort_label_n TEXT ISO week label with cohort size
-- user_lifetime_week INT Weeks since first use of this agent
-- cohort_users BIGINT Users in this cohort for this agent
-- active_users BIGINT Users who ran the agent again in week k
-- retention_rate FLOAT active_users / cohort_users
-- cohort_users_w0 BIGINT cohort_users only at week 0 (safe to SUM)
-- agent_total_users BIGINT Total users across all cohorts for this agent
--
-- EXAMPLE QUERIES
-- -- Best-retained agents at week 2
-- SELECT agent_label, AVG(retention_rate) AS w2_retention
-- FROM analytics.retention_agent
-- WHERE user_lifetime_week = 2 AND cohort_users >= 10
-- GROUP BY 1 ORDER BY w2_retention DESC LIMIT 10;
--
-- -- Agents with most unique users
-- SELECT DISTINCT agent_label, agent_total_users
-- FROM analytics.retention_agent
-- ORDER BY agent_total_users DESC LIMIT 20;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, (CURRENT_DATE - INTERVAL '180 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."agentGraphId" AS agent_id,
e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('week', e."createdAt")::date AS week_start
FROM platform."AgentGraphExecution" e
),
first_use AS (
SELECT user_id, agent_id, MIN(created_at) AS first_use_at,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1,2
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_weeks AS (SELECT DISTINCT user_id, agent_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, aw.agent_id, fu.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fu.first_use_at)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_use fu USING (user_id, agent_id)
WHERE aw.week_start >= DATE_TRUNC('week',fu.first_use_at)::date
),
active_counts AS (
SELECT agent_id, cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2,3
),
cohort_sizes AS (
SELECT agent_id, cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_use GROUP BY 1,2
),
cohort_caps AS (
SELECT cs.agent_id, cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.agent_id, cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
),
agent_names AS (SELECT DISTINCT ON (g."id") g."id" AS agent_id, g."name" AS agent_name FROM platform."AgentGraph" g ORDER BY g."id", g."version" DESC),
agent_total_users AS (SELECT agent_id, SUM(cohort_users) AS agent_total_users FROM cohort_sizes GROUP BY 1)
SELECT
g.agent_id,
COALESCE(an.agent_name,'(unnamed)')||' ['||LEFT(g.agent_id::text,8)||']' AS agent_label,
COALESCE(an.agent_name,'(unnamed)')||' ['||LEFT(g.agent_id::text,8)||'] (n='||COALESCE(atu.agent_total_users,0)||')' AS agent_label_n,
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(ac.active_users,0) AS active_users,
COALESCE(ac.active_users,0)::float / NULLIF(g.cohort_users,0) AS retention_rate,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0,
COALESCE(atu.agent_total_users,0) AS agent_total_users
FROM grid g
LEFT JOIN active_counts ac ON ac.agent_id=g.agent_id AND ac.cohort_week_start=g.cohort_week_start AND ac.user_lifetime_week=g.user_lifetime_week
LEFT JOIN agent_names an ON an.agent_id=g.agent_id
LEFT JOIN agent_total_users atu ON atu.agent_id=g.agent_id
ORDER BY agent_label, g.cohort_week_start, g.user_lifetime_week;

View File

@@ -1,81 +0,0 @@
-- =============================================================
-- View: analytics.retention_execution_daily
-- Looker source alias: ds111 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Daily cohort retention based on agent executions.
-- Cohort anchor = day of user's FIRST ever execution.
-- Only includes cohorts from the last 90 days, up to day 30.
-- Great for early engagement analysis (did users run another
-- agent the next day?).
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
--
-- OUTPUT COLUMNS
-- Same pattern as retention_login_daily.
-- cohort_day_start = day of first execution (not first login)
--
-- EXAMPLE QUERIES
-- -- Day-3 execution retention
-- SELECT cohort_label, retention_rate_bounded AS d3_retention
-- FROM analytics.retention_execution_daily
-- WHERE user_lifetime_day = 3 ORDER BY cohort_day_start;
-- =============================================================
WITH params AS (SELECT 30::int AS max_days, (CURRENT_DATE - INTERVAL '90 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('day', e."createdAt")::date AS day_start
FROM platform."AgentGraphExecution" e WHERE e."userId" IS NOT NULL
),
first_exec AS (
SELECT user_id, MIN(created_at) AS first_exec_at,
DATE_TRUNC('day', MIN(created_at))::date AS cohort_day_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_days AS (SELECT DISTINCT user_id, day_start FROM events),
user_day_age AS (
SELECT ad.user_id, fe.cohort_day_start,
(ad.day_start - DATE_TRUNC('day',fe.first_exec_at)::date)::int AS user_lifetime_day
FROM activity_days ad JOIN first_exec fe USING (user_id)
WHERE ad.day_start >= DATE_TRUNC('day',fe.first_exec_at)::date
),
bounded_counts AS (
SELECT cohort_day_start, user_lifetime_day, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_day_age WHERE user_lifetime_day >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_day_start, user_id, MAX(user_lifetime_day) AS last_active_day FROM user_day_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_day_start, gs AS user_lifetime_day, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_day,(SELECT max_days FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_day_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_exec GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_day_start, cs.cohort_users,
LEAST((SELECT max_days FROM params), GREATEST(0,(CURRENT_DATE-cs.cohort_day_start)::int)) AS cap_days
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_day_start, gs AS user_lifetime_day, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_days) gs
)
SELECT
g.cohort_day_start,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD') AS cohort_label,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_day, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_day=0 THEN g.cohort_users ELSE 0 END AS cohort_users_d0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_day_start=g.cohort_day_start AND b.user_lifetime_day=g.user_lifetime_day
LEFT JOIN unbounded_counts u ON u.cohort_day_start=g.cohort_day_start AND u.user_lifetime_day=g.user_lifetime_day
ORDER BY g.cohort_day_start, g.user_lifetime_day;

View File

@@ -1,81 +0,0 @@
-- =============================================================
-- View: analytics.retention_execution_weekly
-- Looker source alias: ds92 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention based on agent executions.
-- Cohort anchor = week of user's FIRST ever agent execution
-- (not first login). Only includes cohorts from the last 180 days.
-- Useful when you care about product engagement, not just visits.
--
-- SOURCE TABLES
-- platform.AgentGraphExecution — Execution records
--
-- OUTPUT COLUMNS
-- Same pattern as retention_login_weekly.
-- cohort_week_start = week of first execution (not first login)
--
-- EXAMPLE QUERIES
-- -- Week-2 execution retention
-- SELECT cohort_label, retention_rate_bounded
-- FROM analytics.retention_execution_weekly
-- WHERE user_lifetime_week = 2 ORDER BY cohort_week_start;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, (CURRENT_DATE - INTERVAL '180 days') AS cohort_start),
events AS (
SELECT e."userId"::text AS user_id, e."createdAt"::timestamptz AS created_at,
DATE_TRUNC('week', e."createdAt")::date AS week_start
FROM platform."AgentGraphExecution" e WHERE e."userId" IS NOT NULL
),
first_exec AS (
SELECT user_id, MIN(created_at) AS first_exec_at,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fe.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fe.first_exec_at)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_exec fe USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week',fe.first_exec_at)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_exec GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week;

View File

@@ -1,94 +0,0 @@
-- =============================================================
-- View: analytics.retention_login_daily
-- Looker source alias: ds112 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Daily cohort retention based on login sessions.
-- Same logic as retention_login_weekly but at day granularity,
-- showing up to day 30 for cohorts from the last 90 days.
-- Useful for analysing early activation (days 1-7) in detail.
--
-- SOURCE TABLES
-- auth.sessions — Login session records
--
-- OUTPUT COLUMNS (same pattern as retention_login_weekly)
-- cohort_day_start DATE First day the cohort logged in
-- cohort_label TEXT Date string (e.g. '2025-03-01')
-- cohort_label_n TEXT Date + cohort size (e.g. '2025-03-01 (n=12)')
-- user_lifetime_day INT Days since first login (0 = signup day)
-- cohort_users BIGINT Total users in cohort
-- active_users_bounded BIGINT Users active on exactly day k
-- retained_users_unbounded BIGINT Users active any time on/after day k
-- retention_rate_bounded FLOAT bounded / cohort_users
-- retention_rate_unbounded FLOAT unbounded / cohort_users
-- cohort_users_d0 BIGINT cohort_users only at day 0, else 0 (safe to SUM)
--
-- EXAMPLE QUERIES
-- -- Day-1 retention rate (came back next day)
-- SELECT cohort_label, retention_rate_bounded AS d1_retention
-- FROM analytics.retention_login_daily
-- WHERE user_lifetime_day = 1 ORDER BY cohort_day_start;
--
-- -- Average retention curve across all cohorts
-- SELECT user_lifetime_day,
-- SUM(active_users_bounded)::float / NULLIF(SUM(cohort_users_d0), 0) AS avg_retention
-- FROM analytics.retention_login_daily
-- GROUP BY 1 ORDER BY 1;
-- =============================================================
WITH params AS (SELECT 30::int AS max_days, (CURRENT_DATE - INTERVAL '90 days')::date AS cohort_start),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('day', s.created_at)::date AS day_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('day', MIN(created_at))::date AS cohort_day_start
FROM events GROUP BY 1
HAVING MIN(created_at) >= (SELECT cohort_start FROM params)
),
activity_days AS (SELECT DISTINCT user_id, day_start FROM events),
user_day_age AS (
SELECT ad.user_id, fl.cohort_day_start,
(ad.day_start - DATE_TRUNC('day', fl.first_login_time)::date)::int AS user_lifetime_day
FROM activity_days ad JOIN first_login fl USING (user_id)
WHERE ad.day_start >= DATE_TRUNC('day', fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_day_start, user_lifetime_day, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_day_age WHERE user_lifetime_day >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_day_start, user_id, MAX(user_lifetime_day) AS last_active_day FROM user_day_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_day_start, gs AS user_lifetime_day, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_day,(SELECT max_days FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_day_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_day_start, cs.cohort_users,
LEAST((SELECT max_days FROM params), GREATEST(0,(CURRENT_DATE-cs.cohort_day_start)::int)) AS cap_days
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_day_start, gs AS user_lifetime_day, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_days) gs
)
SELECT
g.cohort_day_start,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD') AS cohort_label,
TO_CHAR(g.cohort_day_start,'YYYY-MM-DD')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_day, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_day=0 THEN g.cohort_users ELSE 0 END AS cohort_users_d0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_day_start=g.cohort_day_start AND b.user_lifetime_day=g.user_lifetime_day
LEFT JOIN unbounded_counts u ON u.cohort_day_start=g.cohort_day_start AND u.user_lifetime_day=g.user_lifetime_day
ORDER BY g.cohort_day_start, g.user_lifetime_day;

View File

@@ -1,96 +0,0 @@
-- =============================================================
-- View: analytics.retention_login_onboarded_weekly
-- Looker source alias: ds101 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention from login sessions, restricted to
-- users who "onboarded" — defined as running at least one
-- agent within 365 days of their first login.
-- Filters out users who signed up but never activated,
-- giving a cleaner view of engaged-user retention.
--
-- SOURCE TABLES
-- auth.sessions — Login session records
-- platform.AgentGraphExecution — Used to identify onboarders
--
-- OUTPUT COLUMNS
-- Same as retention_login_weekly (cohort_week_start, user_lifetime_week,
-- retention_rate_bounded, retention_rate_unbounded, etc.)
-- Only difference: cohort is filtered to onboarded users only.
--
-- EXAMPLE QUERIES
-- -- Compare week-4 retention: all users vs onboarded only
-- SELECT 'all_users' AS segment, AVG(retention_rate_bounded) AS w4_retention
-- FROM analytics.retention_login_weekly WHERE user_lifetime_week = 4
-- UNION ALL
-- SELECT 'onboarded', AVG(retention_rate_bounded)
-- FROM analytics.retention_login_onboarded_weekly WHERE user_lifetime_week = 4;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks, 365::int AS onboarding_window_days),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('week', s.created_at)::date AS week_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login_all AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
),
onboarders AS (
SELECT fl.user_id FROM first_login_all fl
WHERE EXISTS (
SELECT 1 FROM platform."AgentGraphExecution" e
WHERE e."userId"::text = fl.user_id
AND e."createdAt" >= fl.first_login_time
AND e."createdAt" < fl.first_login_time
+ make_interval(days => (SELECT onboarding_window_days FROM params))
)
),
first_login AS (SELECT * FROM first_login_all WHERE user_id IN (SELECT user_id FROM onboarders)),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fl.cohort_week_start,
((aw.week_start - DATE_TRUNC('week',fl.first_login_time)::date)/7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_login fl USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week',fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date-cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week;

View File

@@ -1,103 +0,0 @@
-- =============================================================
-- View: analytics.retention_login_weekly
-- Looker source alias: ds83 | Charts: 2
-- =============================================================
-- DESCRIPTION
-- Weekly cohort retention based on login sessions.
-- Users are grouped by the ISO week of their first ever login.
-- For each cohort × lifetime-week combination, outputs both:
-- - bounded rate: % active in exactly that week
-- - unbounded rate: % who were ever active on or after that week
-- Weeks are capped to the cohort's actual age (no future data points).
--
-- SOURCE TABLES
-- auth.sessions — Login session records
--
-- HOW TO READ THE OUTPUT
-- cohort_week_start The Monday of the week users first logged in
-- user_lifetime_week 0 = signup week, 1 = one week later, etc.
-- retention_rate_bounded = active_users_bounded / cohort_users
-- retention_rate_unbounded = retained_users_unbounded / cohort_users
--
-- OUTPUT COLUMNS
-- cohort_week_start DATE First day of the cohort's signup week
-- cohort_label TEXT ISO week label (e.g. '2025-W01')
-- cohort_label_n TEXT ISO week label with cohort size (e.g. '2025-W01 (n=42)')
-- user_lifetime_week INT Weeks since first login (0 = signup week)
-- cohort_users BIGINT Total users in this cohort (denominator)
-- active_users_bounded BIGINT Users active in exactly week k
-- retained_users_unbounded BIGINT Users active any time on/after week k
-- retention_rate_bounded FLOAT bounded active / cohort_users
-- retention_rate_unbounded FLOAT unbounded retained / cohort_users
-- cohort_users_w0 BIGINT cohort_users only at week 0, else 0 (safe to SUM in pivot tables)
--
-- EXAMPLE QUERIES
-- -- Week-1 retention rate per cohort
-- SELECT cohort_label, retention_rate_bounded AS w1_retention
-- FROM analytics.retention_login_weekly
-- WHERE user_lifetime_week = 1
-- ORDER BY cohort_week_start;
--
-- -- Overall average retention curve (all cohorts combined)
-- SELECT user_lifetime_week,
-- SUM(active_users_bounded)::float / NULLIF(SUM(cohort_users_w0), 0) AS avg_retention
-- FROM analytics.retention_login_weekly
-- GROUP BY 1 ORDER BY 1;
-- =============================================================
WITH params AS (SELECT 12::int AS max_weeks),
events AS (
SELECT s.user_id::text AS user_id, s.created_at::timestamptz AS created_at,
DATE_TRUNC('week', s.created_at)::date AS week_start
FROM auth.sessions s WHERE s.user_id IS NOT NULL
),
first_login AS (
SELECT user_id, MIN(created_at) AS first_login_time,
DATE_TRUNC('week', MIN(created_at))::date AS cohort_week_start
FROM events GROUP BY 1
),
activity_weeks AS (SELECT DISTINCT user_id, week_start FROM events),
user_week_age AS (
SELECT aw.user_id, fl.cohort_week_start,
((aw.week_start - DATE_TRUNC('week', fl.first_login_time)::date) / 7)::int AS user_lifetime_week
FROM activity_weeks aw JOIN first_login fl USING (user_id)
WHERE aw.week_start >= DATE_TRUNC('week', fl.first_login_time)::date
),
bounded_counts AS (
SELECT cohort_week_start, user_lifetime_week, COUNT(DISTINCT user_id) AS active_users_bounded
FROM user_week_age WHERE user_lifetime_week >= 0 GROUP BY 1,2
),
last_active AS (
SELECT cohort_week_start, user_id, MAX(user_lifetime_week) AS last_active_week FROM user_week_age GROUP BY 1,2
),
unbounded_counts AS (
SELECT la.cohort_week_start, gs AS user_lifetime_week, COUNT(*) AS retained_users_unbounded
FROM last_active la
CROSS JOIN LATERAL generate_series(0, LEAST(la.last_active_week,(SELECT max_weeks FROM params))) gs
GROUP BY 1,2
),
cohort_sizes AS (SELECT cohort_week_start, COUNT(DISTINCT user_id) AS cohort_users FROM first_login GROUP BY 1),
cohort_caps AS (
SELECT cs.cohort_week_start, cs.cohort_users,
LEAST((SELECT max_weeks FROM params),
GREATEST(0,((DATE_TRUNC('week',CURRENT_DATE)::date - cs.cohort_week_start)/7)::int)) AS cap_weeks
FROM cohort_sizes cs
),
grid AS (
SELECT cc.cohort_week_start, gs AS user_lifetime_week, cc.cohort_users
FROM cohort_caps cc CROSS JOIN LATERAL generate_series(0, cc.cap_weeks) gs
)
SELECT
g.cohort_week_start,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW') AS cohort_label,
TO_CHAR(g.cohort_week_start,'IYYY-"W"IW')||' (n='||g.cohort_users||')' AS cohort_label_n,
g.user_lifetime_week, g.cohort_users,
COALESCE(b.active_users_bounded,0) AS active_users_bounded,
COALESCE(u.retained_users_unbounded,0) AS retained_users_unbounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(b.active_users_bounded,0)::float/g.cohort_users END AS retention_rate_bounded,
CASE WHEN g.cohort_users>0 THEN COALESCE(u.retained_users_unbounded,0)::float/g.cohort_users END AS retention_rate_unbounded,
CASE WHEN g.user_lifetime_week=0 THEN g.cohort_users ELSE 0 END AS cohort_users_w0
FROM grid g
LEFT JOIN bounded_counts b ON b.cohort_week_start=g.cohort_week_start AND b.user_lifetime_week=g.user_lifetime_week
LEFT JOIN unbounded_counts u ON u.cohort_week_start=g.cohort_week_start AND u.user_lifetime_week=g.user_lifetime_week
ORDER BY g.cohort_week_start, g.user_lifetime_week

View File

@@ -1,71 +0,0 @@
-- =============================================================
-- View: analytics.user_block_spending
-- Looker source alias: ds6 | Charts: 5
-- =============================================================
-- DESCRIPTION
-- One row per credit transaction (last 90 days).
-- Shows how users spend credits broken down by block type,
-- LLM provider and model. Joins node execution stats for
-- token-level detail.
--
-- SOURCE TABLES
-- platform.CreditTransaction — Credit debit/credit records
-- platform.AgentNodeExecution — Node execution stats (for token counts)
--
-- OUTPUT COLUMNS
-- transactionKey TEXT Unique transaction identifier
-- userId TEXT User who was charged
-- amount DECIMAL Credit amount (positive = credit, negative = debit)
-- negativeAmount DECIMAL amount * -1 (convenience for spend charts)
-- transactionType TEXT Transaction type (e.g. 'USAGE', 'REFUND', 'TOP_UP')
-- transactionTime TIMESTAMPTZ When the transaction was recorded
-- blockId TEXT Block UUID that triggered the spend
-- blockName TEXT Human-readable block name
-- llm_provider TEXT LLM provider (e.g. 'openai', 'anthropic')
-- llm_model TEXT Model name (e.g. 'gpt-4o', 'claude-3-5-sonnet')
-- node_exec_id TEXT Linked node execution UUID
-- llm_call_count INT LLM API calls made in that execution
-- llm_retry_count INT LLM retries in that execution
-- llm_input_token_count INT Input tokens consumed
-- llm_output_token_count INT Output tokens produced
--
-- WINDOW
-- Rolling 90 days (createdAt > CURRENT_DATE - 90 days)
--
-- EXAMPLE QUERIES
-- -- Total spend per user (last 90 days)
-- SELECT "userId", SUM("negativeAmount") AS total_spent
-- FROM analytics.user_block_spending
-- WHERE "transactionType" = 'USAGE'
-- GROUP BY 1 ORDER BY total_spent DESC;
--
-- -- Spend by LLM provider + model
-- SELECT "llm_provider", "llm_model",
-- SUM("negativeAmount") AS total_cost,
-- SUM("llm_input_token_count") AS input_tokens,
-- SUM("llm_output_token_count") AS output_tokens
-- FROM analytics.user_block_spending
-- WHERE "llm_provider" IS NOT NULL
-- GROUP BY 1, 2 ORDER BY total_cost DESC;
-- =============================================================
SELECT
c."transactionKey" AS transactionKey,
c."userId" AS userId,
c."amount" AS amount,
c."amount" * -1 AS negativeAmount,
c."type" AS transactionType,
c."createdAt" AS transactionTime,
c.metadata->>'block_id' AS blockId,
c.metadata->>'block' AS blockName,
c.metadata->'input'->'credentials'->>'provider' AS llm_provider,
c.metadata->'input'->>'model' AS llm_model,
c.metadata->>'node_exec_id' AS node_exec_id,
(ne."stats"->>'llm_call_count')::int AS llm_call_count,
(ne."stats"->>'llm_retry_count')::int AS llm_retry_count,
(ne."stats"->>'input_token_count')::int AS llm_input_token_count,
(ne."stats"->>'output_token_count')::int AS llm_output_token_count
FROM platform."CreditTransaction" c
LEFT JOIN platform."AgentNodeExecution" ne
ON (c.metadata->>'node_exec_id') = ne."id"::text
WHERE c."createdAt" > CURRENT_DATE - INTERVAL '90 days'

View File

@@ -1,45 +0,0 @@
-- =============================================================
-- View: analytics.user_onboarding
-- Looker source alias: ds68 | Charts: 3
-- =============================================================
-- DESCRIPTION
-- One row per user onboarding record. Contains the user's
-- stated usage reason, selected integrations, completed
-- onboarding steps and optional first agent selection.
-- Full history (no date filter) since onboarding happens
-- once per user.
--
-- SOURCE TABLES
-- platform.UserOnboarding — Onboarding state per user
--
-- OUTPUT COLUMNS
-- id TEXT Onboarding record UUID
-- createdAt TIMESTAMPTZ When onboarding started
-- updatedAt TIMESTAMPTZ Last update to onboarding state
-- usageReason TEXT Why user signed up (e.g. 'work', 'personal')
-- integrations TEXT[] Array of integration names the user selected
-- userId TEXT User UUID
-- completedSteps TEXT[] Array of onboarding step enums completed
-- selectedStoreListingVersionId TEXT First marketplace agent the user chose (if any)
--
-- EXAMPLE QUERIES
-- -- Usage reason breakdown
-- SELECT "usageReason", COUNT(*) FROM analytics.user_onboarding GROUP BY 1;
--
-- -- Completion rate per step
-- SELECT step, COUNT(*) AS users_completed
-- FROM analytics.user_onboarding
-- CROSS JOIN LATERAL UNNEST("completedSteps") AS step
-- GROUP BY 1 ORDER BY users_completed DESC;
-- =============================================================
SELECT
id,
"createdAt",
"updatedAt",
"usageReason",
integrations,
"userId",
"completedSteps",
"selectedStoreListingVersionId"
FROM platform."UserOnboarding"

View File

@@ -1,100 +0,0 @@
-- =============================================================
-- View: analytics.user_onboarding_funnel
-- Looker source alias: ds74 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Pre-aggregated onboarding funnel showing how many users
-- completed each step and the drop-off percentage from the
-- previous step. One row per onboarding step (all 22 steps
-- always present, even with 0 completions — prevents sparse
-- gaps from making LAG compare the wrong predecessors).
--
-- SOURCE TABLES
-- platform.UserOnboarding — Onboarding records with completedSteps array
--
-- OUTPUT COLUMNS
-- step TEXT Onboarding step enum name (e.g. 'WELCOME', 'CONGRATS')
-- step_order INT Numeric position in the funnel (1=first, 22=last)
-- users_completed BIGINT Distinct users who completed this step
-- pct_from_prev NUMERIC % of users from the previous step who reached this one
--
-- STEP ORDER
-- 1 WELCOME 9 MARKETPLACE_VISIT 17 SCHEDULE_AGENT
-- 2 USAGE_REASON 10 MARKETPLACE_ADD_AGENT 18 RUN_AGENTS
-- 3 INTEGRATIONS 11 MARKETPLACE_RUN_AGENT 19 RUN_3_DAYS
-- 4 AGENT_CHOICE 12 BUILDER_OPEN 20 TRIGGER_WEBHOOK
-- 5 AGENT_NEW_RUN 13 BUILDER_SAVE_AGENT 21 RUN_14_DAYS
-- 6 AGENT_INPUT 14 BUILDER_RUN_AGENT 22 RUN_AGENTS_100
-- 7 CONGRATS 15 VISIT_COPILOT
-- 8 GET_RESULTS 16 RE_RUN_AGENT
--
-- WINDOW
-- Users who started onboarding in the last 90 days
--
-- EXAMPLE QUERIES
-- -- Full funnel
-- SELECT * FROM analytics.user_onboarding_funnel ORDER BY step_order;
--
-- -- Biggest drop-off point
-- SELECT step, pct_from_prev FROM analytics.user_onboarding_funnel
-- ORDER BY pct_from_prev ASC LIMIT 3;
-- =============================================================
WITH all_steps AS (
-- Complete ordered grid of all 22 steps so zero-completion steps
-- are always present, keeping LAG comparisons correct.
SELECT step_name, step_order
FROM (VALUES
('WELCOME', 1),
('USAGE_REASON', 2),
('INTEGRATIONS', 3),
('AGENT_CHOICE', 4),
('AGENT_NEW_RUN', 5),
('AGENT_INPUT', 6),
('CONGRATS', 7),
('GET_RESULTS', 8),
('MARKETPLACE_VISIT', 9),
('MARKETPLACE_ADD_AGENT', 10),
('MARKETPLACE_RUN_AGENT', 11),
('BUILDER_OPEN', 12),
('BUILDER_SAVE_AGENT', 13),
('BUILDER_RUN_AGENT', 14),
('VISIT_COPILOT', 15),
('RE_RUN_AGENT', 16),
('SCHEDULE_AGENT', 17),
('RUN_AGENTS', 18),
('RUN_3_DAYS', 19),
('TRIGGER_WEBHOOK', 20),
('RUN_14_DAYS', 21),
('RUN_AGENTS_100', 22)
) AS t(step_name, step_order)
),
raw AS (
SELECT
u."userId",
step_txt::text AS step
FROM platform."UserOnboarding" u
CROSS JOIN LATERAL UNNEST(u."completedSteps") AS step_txt
WHERE u."createdAt" >= CURRENT_DATE - INTERVAL '90 days'
),
step_counts AS (
SELECT step, COUNT(DISTINCT "userId") AS users_completed
FROM raw GROUP BY step
),
funnel AS (
SELECT
a.step_name AS step,
a.step_order,
COALESCE(sc.users_completed, 0) AS users_completed,
ROUND(
100.0 * COALESCE(sc.users_completed, 0)
/ NULLIF(
LAG(COALESCE(sc.users_completed, 0)) OVER (ORDER BY a.step_order),
0
),
2
) AS pct_from_prev
FROM all_steps a
LEFT JOIN step_counts sc ON sc.step = a.step_name
)
SELECT * FROM funnel ORDER BY step_order

View File

@@ -1,41 +0,0 @@
-- =============================================================
-- View: analytics.user_onboarding_integration
-- Looker source alias: ds75 | Charts: 1
-- =============================================================
-- DESCRIPTION
-- Pre-aggregated count of users who selected each integration
-- during onboarding. One row per integration type, sorted
-- by popularity.
--
-- SOURCE TABLES
-- platform.UserOnboarding — integrations array column
--
-- OUTPUT COLUMNS
-- integration TEXT Integration name (e.g. 'github', 'slack', 'notion')
-- users_with_integration BIGINT Distinct users who selected this integration
--
-- WINDOW
-- Users who started onboarding in the last 90 days
--
-- EXAMPLE QUERIES
-- -- Full integration popularity ranking
-- SELECT * FROM analytics.user_onboarding_integration;
--
-- -- Top 5 integrations
-- SELECT * FROM analytics.user_onboarding_integration LIMIT 5;
-- =============================================================
WITH exploded AS (
SELECT
u."userId" AS user_id,
UNNEST(u."integrations") AS integration
FROM platform."UserOnboarding" u
WHERE u."createdAt" >= CURRENT_DATE - INTERVAL '90 days'
)
SELECT
integration,
COUNT(DISTINCT user_id) AS users_with_integration
FROM exploded
WHERE integration IS NOT NULL AND integration <> ''
GROUP BY integration
ORDER BY users_with_integration DESC

View File

@@ -1,145 +0,0 @@
-- =============================================================
-- View: analytics.users_activities
-- Looker source alias: ds56 | Charts: 5
-- =============================================================
-- DESCRIPTION
-- One row per user with lifetime activity summary.
-- Joins login sessions with agent graphs, executions and
-- node-level runs to give a full picture of how engaged
-- each user is. Includes a convenience flag for 7-day
-- activation (did the user return at least 7 days after
-- their first login?).
--
-- SOURCE TABLES
-- auth.sessions — Login/session records
-- platform.AgentGraph — Graphs (agents) built by the user
-- platform.AgentGraphExecution — Agent run history
-- platform.AgentNodeExecution — Individual block execution history
--
-- PERFORMANCE NOTE
-- Each CTE aggregates its own table independently by userId.
-- This avoids the fan-out that occurs when driving every join
-- from user_logins across the two largest tables
-- (AgentGraphExecution and AgentNodeExecution).
--
-- OUTPUT COLUMNS
-- user_id TEXT Supabase user UUID
-- first_login_time TIMESTAMPTZ First ever session created_at
-- last_login_time TIMESTAMPTZ Most recent session created_at
-- last_visit_time TIMESTAMPTZ Max of last refresh or login
-- last_agent_save_time TIMESTAMPTZ Last time user saved an agent graph
-- agent_count BIGINT Number of distinct active graphs built (0 if none)
-- first_agent_run_time TIMESTAMPTZ First ever graph execution
-- last_agent_run_time TIMESTAMPTZ Most recent graph execution
-- unique_agent_runs BIGINT Distinct agent graphs ever run (0 if none)
-- agent_runs BIGINT Total graph execution count (0 if none)
-- node_execution_count BIGINT Total node executions across all runs
-- node_execution_failed BIGINT Node executions with FAILED status
-- node_execution_completed BIGINT Node executions with COMPLETED status
-- node_execution_terminated BIGINT Node executions with TERMINATED status
-- node_execution_queued BIGINT Node executions with QUEUED status
-- node_execution_running BIGINT Node executions with RUNNING status
-- is_active_after_7d INT 1=returned after day 7, 0=did not, NULL=too early to tell
-- node_execution_incomplete BIGINT Node executions with INCOMPLETE status
-- node_execution_review BIGINT Node executions with REVIEW status
--
-- EXAMPLE QUERIES
-- -- Users who ran at least one agent and returned after 7 days
-- SELECT COUNT(*) FROM analytics.users_activities
-- WHERE agent_runs > 0 AND is_active_after_7d = 1;
--
-- -- Top 10 most active users by agent runs
-- SELECT user_id, agent_runs, node_execution_count
-- FROM analytics.users_activities
-- ORDER BY agent_runs DESC LIMIT 10;
--
-- -- 7-day activation rate
-- SELECT
-- SUM(CASE WHEN is_active_after_7d = 1 THEN 1 ELSE 0 END)::float
-- / NULLIF(COUNT(CASE WHEN is_active_after_7d IS NOT NULL THEN 1 END), 0)
-- AS activation_rate
-- FROM analytics.users_activities;
-- =============================================================
WITH user_logins AS (
SELECT
user_id::text AS user_id,
MIN(created_at) AS first_login_time,
MAX(created_at) AS last_login_time,
GREATEST(
MAX(refreshed_at)::timestamptz,
MAX(created_at)::timestamptz
) AS last_visit_time
FROM auth.sessions
GROUP BY user_id
),
user_agents AS (
-- Aggregate AgentGraph directly by userId (no fan-out from user_logins)
SELECT
"userId"::text AS user_id,
MAX("updatedAt") AS last_agent_save_time,
COUNT(DISTINCT "id") AS agent_count
FROM platform."AgentGraph"
WHERE "isActive"
GROUP BY "userId"
),
user_graph_runs AS (
-- Aggregate AgentGraphExecution directly by userId
SELECT
"userId"::text AS user_id,
MIN("createdAt") AS first_agent_run_time,
MAX("createdAt") AS last_agent_run_time,
COUNT(DISTINCT "agentGraphId") AS unique_agent_runs,
COUNT("id") AS agent_runs
FROM platform."AgentGraphExecution"
GROUP BY "userId"
),
user_node_runs AS (
-- Aggregate AgentNodeExecution directly; resolve userId via a
-- single join to AgentGraphExecution instead of fanning out from
-- user_logins through both large tables.
SELECT
g."userId"::text AS user_id,
COUNT(*) AS node_execution_count,
COUNT(*) FILTER (WHERE n."executionStatus" = 'FAILED') AS node_execution_failed,
COUNT(*) FILTER (WHERE n."executionStatus" = 'COMPLETED') AS node_execution_completed,
COUNT(*) FILTER (WHERE n."executionStatus" = 'TERMINATED') AS node_execution_terminated,
COUNT(*) FILTER (WHERE n."executionStatus" = 'QUEUED') AS node_execution_queued,
COUNT(*) FILTER (WHERE n."executionStatus" = 'RUNNING') AS node_execution_running,
COUNT(*) FILTER (WHERE n."executionStatus" = 'INCOMPLETE') AS node_execution_incomplete,
COUNT(*) FILTER (WHERE n."executionStatus" = 'REVIEW') AS node_execution_review
FROM platform."AgentNodeExecution" n
JOIN platform."AgentGraphExecution" g
ON g."id" = n."agentGraphExecutionId"
GROUP BY g."userId"
)
SELECT
ul.user_id,
ul.first_login_time,
ul.last_login_time,
ul.last_visit_time,
ua.last_agent_save_time,
COALESCE(ua.agent_count, 0) AS agent_count,
gr.first_agent_run_time,
gr.last_agent_run_time,
COALESCE(gr.unique_agent_runs, 0) AS unique_agent_runs,
COALESCE(gr.agent_runs, 0) AS agent_runs,
COALESCE(nr.node_execution_count, 0) AS node_execution_count,
COALESCE(nr.node_execution_failed, 0) AS node_execution_failed,
COALESCE(nr.node_execution_completed, 0) AS node_execution_completed,
COALESCE(nr.node_execution_terminated, 0) AS node_execution_terminated,
COALESCE(nr.node_execution_queued, 0) AS node_execution_queued,
COALESCE(nr.node_execution_running, 0) AS node_execution_running,
CASE
WHEN ul.first_login_time < NOW() - INTERVAL '7 days'
AND ul.last_visit_time >= ul.first_login_time + INTERVAL '7 days' THEN 1
WHEN ul.first_login_time < NOW() - INTERVAL '7 days'
AND ul.last_visit_time < ul.first_login_time + INTERVAL '7 days' THEN 0
ELSE NULL
END AS is_active_after_7d,
COALESCE(nr.node_execution_incomplete, 0) AS node_execution_incomplete,
COALESCE(nr.node_execution_review, 0) AS node_execution_review
FROM user_logins ul
LEFT JOIN user_agents ua ON ul.user_id = ua.user_id
LEFT JOIN user_graph_runs gr ON ul.user_id = gr.user_id
LEFT JOIN user_node_runs nr ON ul.user_id = nr.user_id

View File

@@ -638,7 +638,7 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
# Mock get_node_executions to return node_id mapping
mock_get_node_executions = mocker.patch(
"backend.api.features.executions.review.routes.get_node_executions"
"backend.data.execution.get_node_executions"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_exec_id = "test_node_123"
@@ -936,7 +936,7 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
# Mock get_node_executions to return node_id mapping
mock_get_node_executions = mocker.patch(
"backend.api.features.executions.review.routes.get_node_executions"
"backend.data.execution.get_node_executions"
)
mock_node_exec = mocker.Mock(spec=NodeExecutionResult)
mock_node_exec.node_exec_id = "node_exec_approved"
@@ -1148,7 +1148,7 @@ async def test_process_review_action_per_review_auto_approve_granularity(
# Mock get_node_executions to return batch node data
mock_get_node_executions = mocker.patch(
"backend.api.features.executions.review.routes.get_node_executions"
"backend.data.execution.get_node_executions"
)
# Create mock node executions for each review
mock_node_execs = []

View File

@@ -6,15 +6,10 @@ import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, HTTPException, Query, Security, status
from prisma.enums import ReviewStatus
from backend.copilot.constants import (
is_copilot_synthetic_id,
parse_node_id_from_exec_id,
)
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
get_graph_execution_meta,
get_node_executions,
)
from backend.data.graph import get_graph_settings
from backend.data.human_review import (
@@ -41,38 +36,6 @@ router = APIRouter(
)
async def _resolve_node_ids(
node_exec_ids: list[str],
graph_exec_id: str,
is_copilot: bool,
) -> dict[str, str]:
"""Resolve node_exec_id -> node_id for auto-approval records.
CoPilot synthetic IDs encode node_id in the format "{node_id}:{random}".
Graph executions look up node_id from NodeExecution records.
"""
if not node_exec_ids:
return {}
if is_copilot:
return {neid: parse_node_id_from_exec_id(neid) for neid in node_exec_ids}
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {ne.node_exec_id: ne.node_id for ne in node_execs}
result = {}
for neid in node_exec_ids:
if neid in node_exec_map:
result[neid] = node_exec_map[neid]
else:
logger.error(
f"Failed to resolve node_id for {neid}: Node execution not found."
)
return result
@router.get(
"/pending",
summary="Get Pending Reviews",
@@ -147,16 +110,14 @@ async def list_pending_reviews_for_execution(
"""
# Verify user owns the graph execution before returning reviews
# (CoPilot synthetic IDs don't have graph execution records)
if not is_copilot_synthetic_id(graph_exec_id):
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
if not graph_exec:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
return await get_pending_reviews_for_execution(graph_exec_id, user_id)
@@ -199,26 +160,30 @@ async def process_review_action(
)
graph_exec_id = next(iter(graph_exec_ids))
is_copilot = is_copilot_synthetic_id(graph_exec_id)
# Validate execution status for graph executions (skip for CoPilot synthetic IDs)
if not is_copilot:
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
# Validate execution status before processing reviews
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
# Only allow processing reviews if execution is paused for review
# or incomplete (partial execution with some reviews already processed)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
f"Reviews can only be processed when execution is paused (REVIEW status). "
f"Current status: {graph_exec_meta.status}",
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}",
)
# Build review decisions map and track which reviews requested auto-approval
# Auto-approved reviews use original data (no modifications allowed)
@@ -271,7 +236,7 @@ async def process_review_action(
)
return (node_id, False)
# Collect node_exec_ids that need auto-approval and resolve their node_ids
# Collect node_exec_ids that need auto-approval
node_exec_ids_needing_auto_approval = [
node_exec_id
for node_exec_id, review_result in updated_reviews.items()
@@ -279,16 +244,29 @@ async def process_review_action(
and auto_approve_requests.get(node_exec_id, False)
]
node_id_map = await _resolve_node_ids(
node_exec_ids_needing_auto_approval, graph_exec_id, is_copilot
)
# Deduplicate by node_id — one auto-approval per node
# Batch-fetch node executions to get node_ids
nodes_needing_auto_approval: dict[str, Any] = {}
for node_exec_id in node_exec_ids_needing_auto_approval:
node_id = node_id_map.get(node_exec_id)
if node_id and node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_id] = updated_reviews[node_exec_id]
if node_exec_ids_needing_auto_approval:
from backend.data.execution import get_node_executions
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {node_exec.node_exec_id: node_exec for node_exec in node_execs}
for node_exec_id in node_exec_ids_needing_auto_approval:
node_exec = node_exec_map.get(node_exec_id)
if node_exec:
review_result = updated_reviews[node_exec_id]
# Use the first approved review for this node (deduplicate by node_id)
if node_exec.node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_exec.node_id] = review_result
else:
logger.error(
f"Failed to create auto-approval record for {node_exec_id}: "
f"Node execution not found. This may indicate a race condition "
f"or data inconsistency."
)
# Execute all auto-approval creations in parallel (deduplicated by node_id)
auto_approval_results = await asyncio.gather(
@@ -303,11 +281,13 @@ async def process_review_action(
auto_approval_failed_count = 0
for result in auto_approval_results:
if isinstance(result, Exception):
# Unexpected exception during auto-approval creation
auto_approval_failed_count += 1
logger.error(
f"Unexpected exception during auto-approval creation: {result}"
)
elif isinstance(result, tuple) and len(result) == 2 and not result[1]:
# Auto-approval creation failed (returned False)
auto_approval_failed_count += 1
# Count results
@@ -322,20 +302,22 @@ async def process_review_action(
if review.status == ReviewStatus.REJECTED
)
# Resume graph execution only for real graph executions (not CoPilot)
# CoPilot sessions are resumed by the LLM retrying run_block with review_id
if not is_copilot and updated_reviews:
# Resume execution only if ALL pending reviews for this execution have been processed
if updated_reviews:
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
if not still_has_pending:
# Get the graph_id from any processed review
first_review = next(iter(updated_reviews.values()))
try:
# Fetch user and settings to build complete execution context
user = await get_user_by_id(user_id)
settings = await get_graph_settings(
user_id=user_id, graph_id=first_review.graph_id
)
# Preserve user's timezone preference when resuming execution
user_timezone = (
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
)

View File

@@ -37,8 +37,10 @@ import backend.api.features.workspace.routes as workspace_routes
import backend.data.block
import backend.data.db
import backend.data.graph
import backend.data.llm_registry
import backend.data.user
import backend.integrations.webhooks.utils
import backend.server.v2.llm
import backend.util.service
import backend.util.settings
from backend.api.features.library.exceptions import (
@@ -117,11 +119,30 @@ async def lifespan_context(app: fastapi.FastAPI):
AutoRegistry.patch_integrations()
# Refresh LLM registry before initializing blocks so blocks can use registry data
# Note: Graceful fallback for now since no blocks consume registry yet (comes in PR #5)
# When block integration lands, this should fail hard or skip block initialization
try:
await backend.data.llm_registry.refresh_llm_registry()
logger.info("LLM registry refreshed successfully at startup")
except Exception as e:
logger.warning(
f"Failed to refresh LLM registry at startup: {e}. "
"Blocks will initialize with empty registry."
)
await backend.data.block.initialize_blocks()
await backend.data.user.migrate_and_encrypt_user_integrations()
await backend.data.graph.fix_llm_provider_credentials()
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
try:
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
except Exception as e:
logger.warning(
f"Failed to migrate LLM models at startup: {e}. "
"This is expected in test environments without AgentNode table."
)
await backend.integrations.webhooks.utils.migrate_legacy_triggered_graphs()
with launch_darkly_context():
@@ -348,6 +369,11 @@ app.include_router(
tags=["oauth"],
prefix="/api/oauth",
)
app.include_router(
backend.server.v2.llm.router,
tags=["v2", "llm"],
prefix="/api",
)
app.mount("/external-api", external_api)

View File

@@ -624,7 +624,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
graph_id: str,
graph_version: int,
execution_context: "ExecutionContext",
is_graph_execution: bool = True,
**kwargs,
) -> tuple[bool, BlockInput]:
"""
@@ -653,7 +652,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
graph_version=graph_version,
block_name=self.name,
editable=True,
is_graph_execution=is_graph_execution,
)
if decision is None:

View File

@@ -126,7 +126,7 @@ class PrintToConsoleBlock(Block):
output_schema=PrintToConsoleBlock.Output,
test_input={"text": "Hello, World!"},
is_sensitive_action=True,
disabled=True,
disabled=True, # Disabled per Nick Tindle's request (OPEN-3000)
test_output=[
("output", "Hello, World!"),
("status", "printed"),

View File

@@ -67,7 +67,6 @@ class HITLReviewHelper:
graph_version: int,
block_name: str = "Block",
editable: bool = False,
is_graph_execution: bool = True,
) -> Optional[ReviewResult]:
"""
Handle a review request for a block that requires human review.
@@ -144,11 +143,10 @@ class HITLReviewHelper:
logger.info(
f"Block {block_name} pausing execution for node {node_exec_id} - awaiting human review"
)
if is_graph_execution:
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
await HITLReviewHelper.update_node_execution_status(
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
return None # Signal that execution should pause
# Mark review as processed if not already done
@@ -170,7 +168,6 @@ class HITLReviewHelper:
graph_version: int,
block_name: str = "Block",
editable: bool = False,
is_graph_execution: bool = True,
) -> Optional[ReviewDecision]:
"""
Handle a review request and return the decision in a single call.
@@ -200,7 +197,6 @@ class HITLReviewHelper:
graph_version=graph_version,
block_name=block_name,
editable=editable,
is_graph_execution=is_graph_execution,
)
if review_result is None:

View File

@@ -6,32 +6,6 @@
COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]" # Renders as ErrorCard
COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]" # Renders as system info message
# Prefix for all synthetic IDs generated by CoPilot block execution.
# Used to distinguish CoPilot-generated records from real graph execution records
# in PendingHumanReview and other tables.
COPILOT_SYNTHETIC_ID_PREFIX = "copilot-"
# Sub-prefixes for session-scoped and node-scoped synthetic IDs.
COPILOT_SESSION_PREFIX = f"{COPILOT_SYNTHETIC_ID_PREFIX}session-"
COPILOT_NODE_PREFIX = f"{COPILOT_SYNTHETIC_ID_PREFIX}node-"
# Separator used in synthetic node_exec_id to encode node_id.
# Format: "{node_id}:{random_hex}" — extract node_id via rsplit(":", 1)[0]
COPILOT_NODE_EXEC_ID_SEPARATOR = ":"
# Compaction notice messages shown to users.
COMPACTION_DONE_MSG = "Earlier messages were summarized to fit within context limits."
COMPACTION_TOOL_NAME = "context_compaction"
def is_copilot_synthetic_id(id_value: str) -> bool:
"""Check if an ID is a CoPilot synthetic ID (not from a real graph execution)."""
return id_value.startswith(COPILOT_SYNTHETIC_ID_PREFIX)
def parse_node_id_from_exec_id(node_exec_id: str) -> str:
"""Extract node_id from a synthetic node_exec_id.
Format: "{node_id}:{random_hex}" → returns "{node_id}".
"""
return node_exec_id.rsplit(COPILOT_NODE_EXEC_ID_SEPARATOR, 1)[0]

View File

@@ -12,7 +12,6 @@ from .agent_browser import BrowserActTool, BrowserNavigateTool, BrowserScreensho
from .agent_output import AgentOutputTool
from .base import BaseTool
from .bash_exec import BashExecTool
from .continue_run_block import ContinueRunBlockTool
from .create_agent import CreateAgentTool
from .customize_agent import CustomizeAgentTool
from .edit_agent import EditAgentTool
@@ -69,7 +68,6 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"move_agents_to_folder": MoveAgentsToFolderTool(),
"run_agent": RunAgentTool(),
"run_block": RunBlockTool(),
"continue_run_block": ContinueRunBlockTool(),
"run_mcp_tool": RunMCPToolTool(),
"get_mcp_guide": GetMCPGuideTool(),
"view_agent_output": AgentOutputTool(),

View File

@@ -1,157 +0,0 @@
"""Tool for continuing block execution after human review approval."""
import logging
from typing import Any
from prisma.enums import ReviewStatus
from backend.blocks import get_block
from backend.copilot.constants import (
COPILOT_NODE_PREFIX,
COPILOT_SESSION_PREFIX,
parse_node_id_from_exec_id,
)
from backend.copilot.model import ChatSession
from backend.data.db_accessors import review_db
from .base import BaseTool
from .helpers import execute_block, resolve_block_credentials
from .models import ErrorResponse, ToolResponseBase
logger = logging.getLogger(__name__)
class ContinueRunBlockTool(BaseTool):
"""Tool for continuing a block execution after human review approval."""
@property
def name(self) -> str:
return "continue_run_block"
@property
def description(self) -> str:
return (
"Continue executing a block after human review approval. "
"Use this after a run_block call returned review_required. "
"Pass the review_id from the review_required response. "
"The block will execute with the original pre-approved input data."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"review_id": {
"type": "string",
"description": (
"The review_id from a previous review_required response. "
"This resumes execution with the pre-approved input data."
),
},
},
"required": ["review_id"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
review_id = (
kwargs.get("review_id", "").strip() if kwargs.get("review_id") else ""
)
session_id = session.session_id
if not review_id:
return ErrorResponse(
message="Please provide a review_id", session_id=session_id
)
if not user_id:
return ErrorResponse(
message="Authentication required", session_id=session_id
)
# Look up and validate the review record via adapter
reviews = await review_db().get_reviews_by_node_exec_ids([review_id], user_id)
review = reviews.get(review_id)
if not review:
return ErrorResponse(
message=(
f"Review '{review_id}' not found or already executed. "
"It may have been consumed by a previous continue_run_block call."
),
session_id=session_id,
)
# Validate the review belongs to this session
expected_graph_exec_id = f"{COPILOT_SESSION_PREFIX}{session_id}"
if review.graph_exec_id != expected_graph_exec_id:
return ErrorResponse(
message="Review does not belong to this session.",
session_id=session_id,
)
if review.status == ReviewStatus.WAITING:
return ErrorResponse(
message="Review has not been approved yet. "
"Please wait for the user to approve the review first.",
session_id=session_id,
)
if review.status == ReviewStatus.REJECTED:
return ErrorResponse(
message="Review was rejected. The block will not execute.",
session_id=session_id,
)
# Extract block_id from review_id: copilot-node-{block_id}:{random_hex}
block_id = parse_node_id_from_exec_id(review_id).removeprefix(
COPILOT_NODE_PREFIX
)
block = get_block(block_id)
if not block:
return ErrorResponse(
message=f"Block '{block_id}' not found", session_id=session_id
)
input_data: dict[str, Any] = (
review.payload if isinstance(review.payload, dict) else {}
)
logger.info(
f"Continuing block {block.name} ({block_id}) for user {user_id} "
f"with review_id={review_id}"
)
matched_creds, missing_creds = await resolve_block_credentials(
user_id, block, input_data
)
if missing_creds:
return ErrorResponse(
message=f"Block '{block.name}' requires credentials that are not configured.",
session_id=session_id,
)
result = await execute_block(
block=block,
block_id=block_id,
input_data=input_data,
user_id=user_id,
session_id=session_id,
node_exec_id=review_id,
matched_credentials=matched_creds,
)
# Delete review record after successful execution (one-time use)
if result.type != "error":
await review_db().delete_review_by_node_exec_id(review_id, user_id)
return result

View File

@@ -1,186 +0,0 @@
"""Tests for ContinueRunBlockTool."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from prisma.enums import ReviewStatus
from ._test_data import make_session
from .continue_run_block import ContinueRunBlockTool
from .models import BlockOutputResponse, ErrorResponse
_TEST_USER_ID = "test-user-continue"
def _make_review_model(
node_exec_id: str,
status: ReviewStatus = ReviewStatus.APPROVED,
payload: dict | None = None,
graph_exec_id: str = "",
):
"""Create a mock PendingHumanReviewModel."""
mock = MagicMock()
mock.node_exec_id = node_exec_id
mock.status = status
mock.payload = payload or {"text": "hello"}
mock.graph_exec_id = graph_exec_id
return mock
class TestContinueRunBlock:
@pytest.mark.asyncio(loop_scope="session")
async def test_missing_review_id_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id="",
)
assert isinstance(response, ErrorResponse)
assert "review_id" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_review_not_found_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(return_value={})
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id="copilot-node-some-block:abc12345",
)
assert isinstance(response, ErrorResponse)
assert "not found" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_waiting_review_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-some-block:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
review = _make_review_model(
review_id, status=ReviewStatus.WAITING, graph_exec_id=graph_exec_id
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, ErrorResponse)
assert "not been approved" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_rejected_review_returns_error(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-some-block:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
review = _make_review_model(
review_id, status=ReviewStatus.REJECTED, graph_exec_id=graph_exec_id
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
with patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, ErrorResponse)
assert "rejected" in response.message.lower()
@pytest.mark.asyncio(loop_scope="session")
async def test_approved_review_executes_block(self):
tool = ContinueRunBlockTool()
session = make_session(user_id=_TEST_USER_ID)
review_id = "copilot-node-delete-branch-id:abc12345"
graph_exec_id = f"copilot-session-{session.session_id}"
input_data = {"repo_url": "https://github.com/test/repo", "branch": "main"}
review = _make_review_model(
review_id,
status=ReviewStatus.APPROVED,
payload=input_data,
graph_exec_id=graph_exec_id,
)
mock_block = MagicMock()
mock_block.name = "Delete Branch"
async def mock_execute(data, **kwargs):
yield "result", "Branch deleted"
mock_block.execute = mock_execute
mock_block.input_schema.get_credentials_fields_info.return_value = []
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
mock_db = MagicMock()
mock_db.get_reviews_by_node_exec_ids = AsyncMock(
return_value={review_id: review}
)
mock_db.delete_review_by_node_exec_id = AsyncMock(return_value=1)
with (
patch(
"backend.copilot.tools.continue_run_block.review_db",
return_value=mock_db,
),
patch(
"backend.copilot.tools.continue_run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
review_id=review_id,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
assert response.block_name == "Delete Branch"
# Verify review was deleted (one-time use)
mock_db.delete_review_by_node_exec_id.assert_called_once_with(
review_id, _TEST_USER_ID
)

View File

@@ -1,24 +1,7 @@
"""Shared helpers for chat tools."""
import logging
from collections import defaultdict
from typing import Any
from pydantic_core import PydanticUndefined
from backend.blocks._base import AnyBlockSchema
from backend.copilot.constants import COPILOT_NODE_PREFIX, COPILOT_SESSION_PREFIX
from backend.data.db_accessors import workspace_db
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .models import BlockOutputResponse, ErrorResponse, ToolResponseBase
from .utils import match_credentials_to_requirements
logger = logging.getLogger(__name__)
def get_inputs_from_schema(
input_schema: dict[str, Any],
@@ -44,159 +27,3 @@ def get_inputs_from_schema(
for name, schema in properties.items()
if name not in exclude
]
async def execute_block(
*,
block: AnyBlockSchema,
block_id: str,
input_data: dict[str, Any],
user_id: str,
session_id: str,
node_exec_id: str,
matched_credentials: dict[str, CredentialsMetaInput],
sensitive_action_safe_mode: bool = False,
) -> ToolResponseBase:
"""Execute a block with full context setup, credential injection, and error handling.
This is the shared execution path used by both ``run_block`` (after review
check) and ``continue_run_block`` (after approval).
Returns:
BlockOutputResponse on success, ErrorResponse on failure.
"""
try:
workspace = await workspace_db().get_or_create_workspace(user_id)
synthetic_graph_id = f"{COPILOT_SESSION_PREFIX}{session_id}"
synthetic_node_id = f"{COPILOT_NODE_PREFIX}{block_id}"
execution_context = ExecutionContext(
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_id,
graph_version=1,
node_id=synthetic_node_id,
node_exec_id=node_exec_id,
workspace_id=workspace.id,
session_id=session_id,
sensitive_action_safe_mode=sensitive_action_safe_mode,
)
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": execution_context,
"workspace_id": workspace.id,
"graph_exec_id": synthetic_graph_id,
"node_exec_id": node_exec_id,
"node_id": synthetic_node_id,
"graph_version": 1,
"graph_id": synthetic_graph_id,
}
# Inject credentials
creds_manager = IntegrationCredentialsManager()
for field_name, cred_meta in matched_credentials.items():
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
session_id=session_id,
)
async def resolve_block_credentials(
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""Resolve credentials for a block by matching user's available credentials.
Handles discriminated credentials (e.g. provider selection based on model).
Returns:
(matched_credentials, missing_credentials)
"""
input_data = input_data or {}
requirements = _resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
def _resolve_discriminated_credentials(
block: AnyBlockSchema,
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(field_info.discriminator)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
resolved[field_name] = effective_field_info
return resolved

View File

@@ -39,7 +39,6 @@ class ResponseType(str, Enum):
BLOCK_LIST = "block_list"
BLOCK_DETAILS = "block_details"
BLOCK_OUTPUT = "block_output"
REVIEW_REQUIRED = "review_required"
# MCP
MCP_GUIDE = "mcp_guide"
@@ -459,21 +458,6 @@ class BlockOutputResponse(ToolResponseBase):
success: bool = True
class ReviewRequiredResponse(ToolResponseBase):
"""Response when a block requires human review before execution."""
type: ResponseType = ResponseType.REVIEW_REQUIRED
block_id: str
block_name: str
review_id: str = Field(description="The review ID for tracking approval status")
graph_exec_id: str = Field(
description="The graph execution ID for fetching review status"
)
input_data: dict[str, Any] = Field(
description="The input data that requires review"
)
class WebFetchResponse(ToolResponseBase):
"""Response for web_fetch tool."""

View File

@@ -534,9 +534,7 @@ class RunAgentTool(BaseTool):
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' is awaiting human review. "
f"The user can approve or reject inline. After approval, "
f"the execution resumes automatically. Use view_agent_output "
f"with execution_id='{execution.id}' to check the result."
f"Check at {library_agent_link}."
),
session_id=session_id,
execution_id=execution.id,

View File

@@ -2,34 +2,38 @@
import logging
import uuid
from collections import defaultdict
from typing import Any
from pydantic_core import PydanticUndefined
from backend.blocks import BlockType, get_block
from backend.blocks._base import AnyBlockSchema
from backend.copilot.constants import (
COPILOT_NODE_EXEC_ID_SEPARATOR,
COPILOT_NODE_PREFIX,
COPILOT_SESSION_PREFIX,
)
from backend.copilot.model import ChatSession
from backend.data.db_accessors import review_db
from backend.data.db_accessors import workspace_db
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .base import BaseTool
from .find_block import COPILOT_EXCLUDED_BLOCK_IDS, COPILOT_EXCLUDED_BLOCK_TYPES
from .helpers import execute_block, get_inputs_from_schema, resolve_block_credentials
from .helpers import get_inputs_from_schema
from .models import (
BlockDetails,
BlockDetailsResponse,
BlockOutputResponse,
ErrorResponse,
InputValidationErrorResponse,
ReviewRequiredResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
from .utils import build_missing_credentials_from_field_info
from .utils import (
build_missing_credentials_from_field_info,
match_credentials_to_requirements,
)
logger = logging.getLogger(__name__)
@@ -48,9 +52,7 @@ class RunBlockTool(BaseTool):
"IMPORTANT: You MUST call find_block first to get the block's 'id' - "
"do NOT guess or make up block IDs. "
"On first attempt (without input_data), returns detailed schema showing "
"required inputs and outputs. Then call again with proper input_data to execute. "
"If a block requires human review, use continue_run_block with the "
"review_id after the user approves."
"required inputs and outputs. Then call again with proper input_data to execute."
)
@property
@@ -164,10 +166,11 @@ class RunBlockTool(BaseTool):
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
creds_manager = IntegrationCredentialsManager()
(
matched_credentials,
missing_credentials,
) = await resolve_block_credentials(user_id, block, input_data)
) = await self._resolve_block_credentials(user_id, block, input_data)
# Get block schemas for details/validation
try:
@@ -276,97 +279,169 @@ class RunBlockTool(BaseTool):
user_authenticated=True,
)
# Generate synthetic IDs for CoPilot context.
# Encode node_id in node_exec_id so it can be extracted later
# (e.g. for auto-approve, where we need node_id but have no NodeExecution row).
synthetic_graph_id = f"{COPILOT_SESSION_PREFIX}{session.session_id}"
synthetic_node_id = f"{COPILOT_NODE_PREFIX}{block_id}"
try:
# Get or create user's workspace for CoPilot file operations
workspace = await workspace_db().get_or_create_workspace(user_id)
# Check for an existing WAITING review for this block with the same input.
# If the LLM retries run_block with identical input, we reuse the existing
# review instead of creating duplicates. Different inputs = new execution.
existing_reviews = await review_db().get_pending_reviews_for_execution(
synthetic_graph_id, user_id
)
existing_review = next(
(
r
for r in existing_reviews
if r.node_id == synthetic_node_id
and r.status.value == "WAITING"
and r.payload == input_data
),
None,
)
if existing_review:
return ReviewRequiredResponse(
message=(
f"Block '{block.name}' requires human review. "
f"After the user approves, call continue_run_block with "
f"review_id='{existing_review.node_exec_id}' to execute."
),
session_id=session_id,
block_id=block_id,
block_name=block.name,
review_id=existing_review.node_exec_id,
graph_exec_id=synthetic_graph_id,
input_data=input_data,
# Generate synthetic IDs for CoPilot context
# Each chat session is treated as its own agent with one continuous run
# This means:
# - graph_id (agent) = session (memories scoped to session when limit_to_agent=True)
# - graph_exec_id (run) = session (memories scoped to session when limit_to_run=True)
# - node_exec_id = unique per block execution
synthetic_graph_id = f"copilot-session-{session.session_id}"
synthetic_graph_exec_id = f"copilot-session-{session.session_id}"
synthetic_node_id = f"copilot-node-{block_id}"
synthetic_node_exec_id = (
f"copilot-{session.session_id}-{uuid.uuid4().hex[:8]}"
)
synthetic_node_exec_id = (
f"{synthetic_node_id}{COPILOT_NODE_EXEC_ID_SEPARATOR}"
f"{uuid.uuid4().hex[:8]}"
)
# Check for HITL review before execution.
# This creates the review record in the DB for CoPilot flows.
review_context = ExecutionContext(
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_id,
graph_version=1,
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
sensitive_action_safe_mode=True,
)
should_pause, input_data = await block.is_block_exec_need_review(
input_data,
user_id=user_id,
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
graph_exec_id=synthetic_graph_id,
graph_id=synthetic_graph_id,
graph_version=1,
execution_context=review_context,
is_graph_execution=False,
)
if should_pause:
return ReviewRequiredResponse(
message=(
f"Block '{block.name}' requires human review. "
f"After the user approves, call continue_run_block with "
f"review_id='{synthetic_node_exec_id}' to execute."
),
session_id=session_id,
block_id=block_id,
block_name=block.name,
review_id=synthetic_node_exec_id,
graph_exec_id=synthetic_graph_id,
input_data=input_data,
# Create unified execution context with all required fields
execution_context = ExecutionContext(
# Execution identity
user_id=user_id,
graph_id=synthetic_graph_id,
graph_exec_id=synthetic_graph_exec_id,
graph_version=1, # Versions are 1-indexed
node_id=synthetic_node_id,
node_exec_id=synthetic_node_exec_id,
# Workspace with session scoping
workspace_id=workspace.id,
session_id=session.session_id,
)
return await execute_block(
block=block,
block_id=block_id,
input_data=input_data,
user_id=user_id,
session_id=session_id,
node_exec_id=synthetic_node_exec_id,
matched_credentials=matched_credentials,
)
# Prepare kwargs for block execution
# Keep individual kwargs for backwards compatibility with existing blocks
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": execution_context,
# Legacy: individual kwargs for blocks not yet using execution_context
"workspace_id": workspace.id,
"graph_exec_id": synthetic_graph_exec_id,
"node_exec_id": synthetic_node_exec_id,
"node_id": synthetic_node_id,
"graph_version": 1, # Versions are 1-indexed
"graph_id": synthetic_graph_id,
}
for field_name, cred_meta in matched_credentials.items():
# Inject metadata into input_data (for validation)
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
# Fetch actual credentials and pass as kwargs (for execution)
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
session_id=session_id,
)
async def _resolve_block_credentials(
self,
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Resolve credentials for a block by matching user's available credentials.
Args:
user_id: User ID
block: Block to resolve credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple of (matched_credentials, missing_credentials) - matched credentials
are used for block execution, missing ones indicate setup requirements.
"""
input_data = input_data or {}
requirements = self._resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
def _get_inputs_list(self, block: AnyBlockSchema) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema."""
schema = block.input_schema.jsonschema()
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)
def _resolve_discriminated_credentials(
self,
block: AnyBlockSchema,
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(
field_info.discriminator
)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
# For host-scoped credentials, add the discriminator value
# (e.g., URL) so _credential_is_for_host can match it
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
resolved[field_name] = effective_field_info
return resolved

View File

@@ -12,7 +12,6 @@ from .models import (
BlockOutputResponse,
ErrorResponse,
InputValidationErrorResponse,
ReviewRequiredResponse,
)
from .run_block import RunBlockTool
@@ -28,16 +27,9 @@ def make_mock_block(
mock.name = name
mock.block_type = block_type
mock.disabled = disabled
mock.is_sensitive_action = False
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields_info.return_value = {}
mock.input_schema.get_credentials_fields.return_value = {}
async def _no_review(input_data, **kwargs):
return False, input_data
mock.is_block_exec_need_review = _no_review
mock.input_schema.get_credentials_fields_info.return_value = []
return mock
@@ -54,7 +46,6 @@ def make_mock_block_with_schema(
mock.name = name
mock.block_type = BlockType.STANDARD
mock.disabled = False
mock.is_sensitive_action = False
mock.description = f"Test block: {name}"
input_schema = {
@@ -72,12 +63,6 @@ def make_mock_block_with_schema(
mock.output_schema = MagicMock()
mock.output_schema.jsonschema.return_value = output_schema
# Default: no review needed, pass through input_data unchanged
async def _no_review(input_data, **kwargs):
return False, input_data
mock.is_block_exec_need_review = _no_review
return mock
@@ -141,15 +126,9 @@ class TestRunBlockFiltering:
"standard-id", "HTTP Request", BlockType.STANDARD
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=standard_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=standard_block,
):
tool = RunBlockTool()
response = await tool._execute(
@@ -175,7 +154,12 @@ class TestRunBlockInputValidation:
@pytest.mark.asyncio(loop_scope="session")
async def test_unknown_input_fields_are_rejected(self):
"""run_block rejects unknown input fields instead of silently ignoring them."""
"""run_block rejects unknown input fields instead of silently ignoring them.
Scenario: The AI Text Generator block has a field called 'model' (for LLM model
selection), but the LLM calling the tool guesses wrong and sends 'LLM_Model'
instead. The block should reject the request and return the valid schema.
"""
session = make_session(user_id=_TEST_USER_ID)
mock_block = make_mock_block_with_schema(
@@ -198,31 +182,27 @@ class TestRunBlockInputValidation:
output_properties={"response": {"type": "string"}},
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
):
tool = RunBlockTool()
# Provide 'prompt' (correct) but 'LLM_Model' instead of 'model' (wrong key)
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"prompt": "Write a haiku about coding",
"LLM_Model": "claude-opus-4-6",
"LLM_Model": "claude-opus-4-6", # WRONG KEY - should be 'model'
},
)
assert isinstance(response, InputValidationErrorResponse)
assert "LLM_Model" in response.unrecognized_fields
assert "Block was not executed" in response.message
assert "inputs" in response.model_dump()
assert "inputs" in response.model_dump() # valid schema included
@pytest.mark.asyncio(loop_scope="session")
async def test_multiple_wrong_keys_are_all_reported(self):
@@ -241,26 +221,21 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"prompt": "Hello",
"llm_model": "claude-opus-4-6",
"system_prompt": "Be helpful",
"retries": 5,
"prompt": "Hello", # correct
"llm_model": "claude-opus-4-6", # WRONG - should be 'model'
"system_prompt": "Be helpful", # WRONG - should be 'sys_prompt'
"retries": 5, # WRONG - should be 'retry'
},
)
@@ -287,26 +262,23 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
):
tool = RunBlockTool()
# 'prompt' is missing AND 'LLM_Model' is an unknown field
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"LLM_Model": "claude-opus-4-6",
"LLM_Model": "claude-opus-4-6", # wrong key, and 'prompt' is missing
},
)
# Unknown fields are caught first
assert isinstance(response, InputValidationErrorResponse)
assert "LLM_Model" in response.unrecognized_fields
@@ -341,11 +313,7 @@ class TestRunBlockInputValidation:
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
"backend.copilot.tools.run_block.workspace_db",
return_value=mock_workspace_db,
),
):
@@ -357,7 +325,7 @@ class TestRunBlockInputValidation:
block_id="ai-text-gen-id",
input_data={
"prompt": "Write a haiku",
"model": "gpt-4o-mini",
"model": "gpt-4o-mini", # correct field name
},
)
@@ -379,191 +347,20 @@ class TestRunBlockInputValidation:
required_fields=["prompt"],
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
with patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
):
tool = RunBlockTool()
# Only provide valid optional field, missing required 'prompt'
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="ai-text-gen-id",
input_data={
"model": "gpt-4o-mini",
"model": "gpt-4o-mini", # valid but optional
},
)
assert isinstance(response, BlockDetailsResponse)
class TestRunBlockSensitiveAction:
"""Tests for sensitive action HITL review in RunBlockTool.
run_block calls is_block_exec_need_review() explicitly before execution.
When review is needed (should_pause=True), ReviewRequiredResponse is returned.
"""
@pytest.mark.asyncio(loop_scope="session")
async def test_sensitive_block_paused_returns_review_required(self):
"""When is_block_exec_need_review returns should_pause=True, ReviewRequiredResponse is returned."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {
"repo_url": "https://github.com/test/repo",
"branch": "feature-branch",
}
mock_block = make_mock_block_with_schema(
block_id="delete-branch-id",
name="Delete Branch",
input_properties={
"repo_url": {"type": "string"},
"branch": {"type": "string"},
},
required_fields=["repo_url", "branch"],
)
mock_block.is_sensitive_action = True
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(True, input_data)
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="delete-branch-id",
input_data=input_data,
)
assert isinstance(response, ReviewRequiredResponse)
assert "requires human review" in response.message
assert "continue_run_block" in response.message
assert response.block_name == "Delete Branch"
@pytest.mark.asyncio(loop_scope="session")
async def test_sensitive_block_executes_after_approval(self):
"""After approval (should_pause=False), sensitive blocks execute and return outputs."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {
"repo_url": "https://github.com/test/repo",
"branch": "feature-branch",
}
mock_block = make_mock_block_with_schema(
block_id="delete-branch-id",
name="Delete Branch",
input_properties={
"repo_url": {"type": "string"},
"branch": {"type": "string"},
},
required_fields=["repo_url", "branch"],
)
mock_block.is_sensitive_action = True
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(False, input_data)
)
async def mock_execute(input_data, **kwargs):
yield "result", "Branch deleted successfully"
mock_block.execute = mock_execute
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="delete-branch-id",
input_data=input_data,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True
@pytest.mark.asyncio(loop_scope="session")
async def test_non_sensitive_block_executes_normally(self):
"""Non-sensitive blocks skip review and execute directly."""
session = make_session(user_id=_TEST_USER_ID)
input_data = {"url": "https://example.com"}
mock_block = make_mock_block_with_schema(
block_id="http-request-id",
name="HTTP Request",
input_properties={
"url": {"type": "string"},
},
required_fields=["url"],
)
mock_block.is_sensitive_action = False
mock_block.is_block_exec_need_review = AsyncMock(
return_value=(False, input_data)
)
async def mock_execute(input_data, **kwargs):
yield "response", {"status": 200}
mock_block.execute = mock_execute
mock_workspace_db = MagicMock()
mock_workspace_db.get_or_create_workspace = AsyncMock(
return_value=MagicMock(id="test-workspace-id")
)
with (
patch(
"backend.copilot.tools.run_block.get_block",
return_value=mock_block,
),
patch(
"backend.copilot.tools.helpers.match_credentials_to_requirements",
return_value=({}, []),
),
patch(
"backend.copilot.tools.helpers.workspace_db",
return_value=mock_workspace_db,
),
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="http-request-id",
input_data=input_data,
)
assert isinstance(response, BlockOutputResponse)
assert response.success is True

View File

@@ -65,8 +65,9 @@ async def test_run_block_returns_details_when_no_input_provided():
return_value=http_block,
):
# Mock credentials check to return no missing credentials
with patch(
"backend.copilot.tools.run_block.resolve_block_credentials",
with patch.object(
RunBlockTool,
"_resolve_block_credentials",
new_callable=AsyncMock,
return_value=({}, []), # (matched_credentials, missing_credentials)
):
@@ -122,8 +123,9 @@ async def test_run_block_returns_details_when_only_credentials_provided():
"backend.copilot.tools.run_block.get_block",
return_value=mock,
):
with patch(
"backend.copilot.tools.run_block.resolve_block_credentials",
with patch.object(
RunBlockTool,
"_resolve_block_credentials",
new_callable=AsyncMock,
return_value=(
{

View File

@@ -116,16 +116,3 @@ def workspace_db():
workspace_db = get_database_manager_async_client()
return workspace_db
def review_db():
if db.is_connected():
from backend.data import human_review as _review_db
review_db = _review_db
else:
from backend.util.clients import get_database_manager_async_client
review_db = get_database_manager_async_client()
return review_db

View File

@@ -79,10 +79,7 @@ from backend.data.graph import (
from backend.data.human_review import (
cancel_pending_reviews_for_execution,
check_approval,
delete_review_by_node_exec_id,
get_or_create_human_review,
get_pending_reviews_for_execution,
get_reviews_by_node_exec_ids,
has_pending_reviews_for_graph_exec,
update_review_processed_status,
)
@@ -249,10 +246,7 @@ class DatabaseManager(AppService):
# ============ Human In The Loop ============ #
cancel_pending_reviews_for_execution = _(cancel_pending_reviews_for_execution)
check_approval = _(check_approval)
delete_review_by_node_exec_id = _(delete_review_by_node_exec_id)
get_or_create_human_review = _(get_or_create_human_review)
get_pending_reviews_for_execution = _(get_pending_reviews_for_execution)
get_reviews_by_node_exec_ids = _(get_reviews_by_node_exec_ids)
has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
update_review_processed_status = _(update_review_processed_status)
@@ -439,10 +433,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
# ============ Human In The Loop ============ #
cancel_pending_reviews_for_execution = d.cancel_pending_reviews_for_execution
check_approval = d.check_approval
delete_review_by_node_exec_id = d.delete_review_by_node_exec_id
get_or_create_human_review = d.get_or_create_human_review
get_pending_reviews_for_execution = d.get_pending_reviews_for_execution
get_reviews_by_node_exec_ids = d.get_reviews_by_node_exec_ids
update_review_processed_status = d.update_review_processed_status
# ============ User Comms ============ #

View File

@@ -17,10 +17,6 @@ from backend.api.features.executions.review.model import (
PendingHumanReviewModel,
SafeJsonData,
)
from backend.copilot.constants import (
is_copilot_synthetic_id,
parse_node_id_from_exec_id,
)
from backend.data.execution import get_graph_execution_meta
from backend.util.json import SafeJson
@@ -127,13 +123,11 @@ async def create_auto_approval_record(
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate ownership: if a graph execution record exists, it must belong
# to this user. Non-graph executions (e.g. CoPilot) won't have a record.
if not is_copilot_synthetic_id(
graph_exec_id
) and not await get_graph_execution_meta(
# Validate that the graph execution belongs to this user (defense in depth)
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
):
)
if not graph_exec:
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
@@ -271,7 +265,7 @@ async def get_pending_review_by_node_exec_id(
async def get_reviews_by_node_exec_ids(
node_exec_ids: list[str], user_id: str
) -> dict[str, PendingHumanReviewModel]:
) -> dict[str, "PendingHumanReviewModel"]:
"""
Get multiple reviews by their node execution IDs regardless of status.
@@ -298,26 +292,21 @@ async def get_reviews_by_node_exec_ids(
if not reviews:
return {}
# Split into synthetic (CoPilot) and real IDs for different resolution paths
synthetic_ids = {
r.nodeExecId for r in reviews if is_copilot_synthetic_id(r.nodeExecId)
}
real_ids = [r.nodeExecId for r in reviews if r.nodeExecId not in synthetic_ids]
# Batch fetch all node executions to avoid N+1 queries
node_exec_ids_to_fetch = [review.nodeExecId for review in reviews]
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": node_exec_ids_to_fetch}},
include={"Node": True},
)
# Batch fetch real node executions to avoid N+1 queries
node_exec_id_to_node_id: dict[str, str] = {}
if real_ids:
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": real_ids}},
)
node_exec_id_to_node_id = {ne.id: ne.agentNodeId for ne in node_execs}
# Create mapping from node_exec_id to node_id
node_exec_id_to_node_id = {
node_exec.id: node_exec.agentNodeId for node_exec in node_execs
}
result = {}
for review in reviews:
if review.nodeExecId in synthetic_ids:
node_id = parse_node_id_from_exec_id(review.nodeExecId)
else:
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
@@ -342,19 +331,6 @@ async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
return count > 0
async def _resolve_node_id(node_exec_id: str, get_node_execution) -> str:
"""Resolve node_id from a node_exec_id.
For CoPilot synthetic IDs (e.g. copilot-node-block-id:abc12345),
extract the node_id portion (copilot-node-block-id).
For real graph executions, look up the NodeExecution record.
"""
if is_copilot_synthetic_id(node_exec_id):
return parse_node_id_from_exec_id(node_exec_id)
node_exec = await get_node_execution(node_exec_id)
return node_exec.node_id if node_exec else node_exec_id
async def get_pending_reviews_for_user(
user_id: str, page: int = 1, page_size: int = 25
) -> list["PendingHumanReviewModel"]:
@@ -385,7 +361,8 @@ async def get_pending_reviews_for_user(
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
@@ -393,7 +370,7 @@ async def get_pending_reviews_for_user(
async def get_pending_reviews_for_execution(
graph_exec_id: str, user_id: str
) -> list[PendingHumanReviewModel]:
) -> list["PendingHumanReviewModel"]:
"""
Get all pending reviews for a specific graph execution.
@@ -419,7 +396,8 @@ async def get_pending_reviews_for_execution(
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_id = await _resolve_node_id(review.nodeExecId, get_node_execution)
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
@@ -531,12 +509,8 @@ async def process_all_reviews_for_execution(
result = {}
for review in all_result_reviews:
if is_copilot_synthetic_id(review.nodeExecId):
# CoPilot synthetic node_exec_ids encode node_id as "{node_id}:{random}"
node_id = parse_node_id_from_exec_id(review.nodeExecId)
else:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
@@ -590,21 +564,3 @@ async def cancel_pending_reviews_for_execution(graph_exec_id: str, user_id: str)
},
)
return result
async def delete_review_by_node_exec_id(node_exec_id: str, user_id: str) -> int:
"""Delete a review record by node execution ID after it has been consumed.
Used by CoPilot's continue_run_block to clean up one-time-use review records
after successful execution.
Args:
node_exec_id: The node execution ID of the review to delete
user_id: User ID for authorization
Returns:
Number of records deleted
"""
return await PendingHumanReview.prisma().delete_many(
where={"nodeExecId": node_exec_id, "userId": user_id}
)

View File

@@ -0,0 +1,31 @@
"""LLM Registry - Dynamic model management system."""
from .model import ModelMetadata
from .registry import (
RegistryModel,
RegistryModelCost,
RegistryModelCreator,
get_all_model_slugs_for_validation,
get_all_models,
get_default_model_slug,
get_enabled_models,
get_model,
get_schema_options,
refresh_llm_registry,
)
__all__ = [
# Models
"ModelMetadata",
"RegistryModel",
"RegistryModelCost",
"RegistryModelCreator",
# Functions
"refresh_llm_registry",
"get_model",
"get_all_models",
"get_enabled_models",
"get_schema_options",
"get_default_model_slug",
"get_all_model_slugs_for_validation",
]

View File

@@ -0,0 +1,9 @@
"""Type definitions for LLM model metadata.
Re-exports ModelMetadata from blocks.llm to avoid type collision.
In PR #5 (block integration), this will become the canonical location.
"""
from backend.blocks.llm import ModelMetadata
__all__ = ["ModelMetadata"]

View File

@@ -0,0 +1,240 @@
"""Core LLM registry implementation for managing models dynamically."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any
import prisma.models
from backend.data.llm_registry.model import ModelMetadata
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class RegistryModelCost:
"""Cost configuration for an LLM model."""
unit: str # "RUN" or "TOKENS"
credit_cost: int
credential_provider: str
credential_id: str | None
credential_type: str | None
currency: str | None
metadata: dict[str, Any]
@dataclass(frozen=True)
class RegistryModelCreator:
"""Creator information for an LLM model."""
id: str
name: str
display_name: str
description: str | None
website_url: str | None
logo_url: str | None
@dataclass(frozen=True)
class RegistryModel:
"""Represents a model in the LLM registry."""
slug: str
display_name: str
description: str | None
metadata: ModelMetadata
capabilities: dict[str, Any]
extra_metadata: dict[str, Any]
provider_display_name: str
is_enabled: bool
is_recommended: bool = False
costs: tuple[RegistryModelCost, ...] = field(default_factory=tuple)
creator: RegistryModelCreator | None = None
# In-memory cache (will be replaced with Redis in PR #6)
_dynamic_models: dict[str, RegistryModel] = {}
_schema_options: list[dict[str, str]] = []
_lock = asyncio.Lock()
async def refresh_llm_registry() -> None:
"""
Refresh the LLM registry from the database.
Fetches all models with their costs, providers, and creators,
then updates the in-memory cache.
"""
async with _lock:
try:
records = await prisma.models.LlmModel.prisma().find_many(
include={
"Provider": True,
"Costs": True,
"Creator": True,
}
)
logger.info(f"Fetched {len(records)} LLM models from database")
# Build model instances
new_models: dict[str, RegistryModel] = {}
for record in records:
# Parse costs
costs = tuple(
RegistryModelCost(
unit=str(cost.unit), # Convert enum to string
credit_cost=cost.creditCost,
credential_provider=cost.credentialProvider,
credential_id=cost.credentialId,
credential_type=cost.credentialType,
currency=cost.currency,
metadata=dict(cost.metadata or {}),
)
for cost in (record.Costs or [])
)
# Parse creator
creator = None
if record.Creator:
creator = RegistryModelCreator(
id=record.Creator.id,
name=record.Creator.name,
display_name=record.Creator.displayName,
description=record.Creator.description,
website_url=record.Creator.websiteUrl,
logo_url=record.Creator.logoUrl,
)
# Parse capabilities
capabilities = dict(record.capabilities or {})
# Build metadata from record
# Warn if Provider relation is missing (indicates data corruption)
if not record.Provider:
logger.warning(
f"LlmModel {record.slug} has no Provider despite NOT NULL FK - "
f"falling back to providerId {record.providerId}"
)
provider_name = (
record.Provider.name if record.Provider else record.providerId
)
provider_display = (
record.Provider.displayName
if record.Provider
else record.providerId
)
# Extract creator name (fallback to "Unknown" if no creator)
creator_name = (
record.Creator.displayName if record.Creator else "Unknown"
)
# Price tier defaults to 1 if not set
price_tier = record.priceTier if record.priceTier in (1, 2, 3) else 1
metadata = ModelMetadata(
provider=provider_name,
context_window=record.contextWindow,
max_output_tokens=(
record.maxOutputTokens
if record.maxOutputTokens is not None
else record.contextWindow
),
display_name=record.displayName,
provider_name=provider_display,
creator_name=creator_name,
price_tier=price_tier,
)
# Create model instance
model = RegistryModel(
slug=record.slug,
display_name=record.displayName,
description=record.description,
metadata=metadata,
capabilities=capabilities,
extra_metadata=dict(record.metadata or {}),
provider_display_name=provider_display,
is_enabled=record.isEnabled,
is_recommended=record.isRecommended,
costs=costs,
creator=creator,
)
new_models[record.slug] = model
# Atomic swap
global _dynamic_models, _schema_options
_dynamic_models = new_models
_schema_options = _build_schema_options()
logger.info(
f"LLM registry refreshed: {len(_dynamic_models)} models, "
f"{len(_schema_options)} schema options"
)
except Exception as e:
logger.error(f"Failed to refresh LLM registry: {e}", exc_info=True)
raise
def _build_schema_options() -> list[dict[str, str]]:
"""Build schema options for model selection dropdown. Only includes enabled models."""
return [
{
"label": model.display_name,
"value": model.slug,
"group": model.metadata.provider,
"description": model.description or "",
}
for model in sorted(
_dynamic_models.values(), key=lambda m: m.display_name.lower()
)
if model.is_enabled
]
def get_model(slug: str) -> RegistryModel | None:
"""Get a model by slug from the registry."""
return _dynamic_models.get(slug)
def get_all_models() -> list[RegistryModel]:
"""Get all models from the registry (including disabled)."""
return list(_dynamic_models.values())
def get_enabled_models() -> list[RegistryModel]:
"""Get only enabled models from the registry."""
return [model for model in _dynamic_models.values() if model.is_enabled]
def get_schema_options() -> list[dict[str, str]]:
"""Get schema options for model selection dropdown (enabled models only)."""
return _schema_options
def get_default_model_slug() -> str | None:
"""Get the default model slug (first recommended, or first enabled)."""
# Sort once and use next() to short-circuit on first match
models = sorted(_dynamic_models.values(), key=lambda m: m.display_name)
# Prefer recommended models
recommended = next(
(m.slug for m in models if m.is_recommended and m.is_enabled), None
)
if recommended:
return recommended
# Fallback to first enabled model
return next((m.slug for m in models if m.is_enabled), None)
def get_all_model_slugs_for_validation() -> list[str]:
"""
Get all model slugs for validation (enables migrate_llm_models to work).
Returns slugs for enabled models only.
"""
return [model.slug for model in _dynamic_models.values() if model.is_enabled]

View File

@@ -0,0 +1,5 @@
"""LLM registry public API."""
from .routes import router
__all__ = ["router"]

View File

@@ -0,0 +1,67 @@
"""Pydantic models for LLM registry public API."""
from __future__ import annotations
from typing import Any
import pydantic
class LlmModelCost(pydantic.BaseModel):
"""Cost configuration for an LLM model."""
unit: str # "RUN" or "TOKENS"
credit_cost: int = pydantic.Field(ge=0)
credential_provider: str
credential_id: str | None = None
credential_type: str | None = None
currency: str | None = None
metadata: dict[str, Any] = pydantic.Field(default_factory=dict)
class LlmModelCreator(pydantic.BaseModel):
"""Represents the organization that created/trained the model."""
id: str
name: str
display_name: str
description: str | None = None
website_url: str | None = None
logo_url: str | None = None
class LlmModel(pydantic.BaseModel):
"""Public-facing LLM model information."""
slug: str
display_name: str
description: str | None = None
provider_name: str
creator: LlmModelCreator | None = None
context_window: int
max_output_tokens: int | None = None
price_tier: int # 1=cheapest, 2=medium, 3=expensive
is_recommended: bool = False
capabilities: dict[str, Any] = pydantic.Field(default_factory=dict)
costs: list[LlmModelCost] = pydantic.Field(default_factory=list)
class LlmProvider(pydantic.BaseModel):
"""Provider with its enabled models."""
name: str
display_name: str
models: list[LlmModel] = pydantic.Field(default_factory=list)
class LlmModelsResponse(pydantic.BaseModel):
"""Response for GET /llm/models."""
models: list[LlmModel]
total: int
class LlmProvidersResponse(pydantic.BaseModel):
"""Response for GET /llm/providers."""
providers: list[LlmProvider]

View File

@@ -0,0 +1,141 @@
"""Public read-only API for LLM registry."""
import autogpt_libs.auth
import fastapi
from backend.data.llm_registry import (
RegistryModelCreator,
get_all_models,
get_enabled_models,
)
from backend.server.v2.llm import model as llm_model
router = fastapi.APIRouter(
prefix="/llm",
tags=["llm"],
dependencies=[fastapi.Security(autogpt_libs.auth.requires_user)],
)
def _map_creator(
creator: RegistryModelCreator | None,
) -> llm_model.LlmModelCreator | None:
"""Convert registry creator to API model."""
if not creator:
return None
return llm_model.LlmModelCreator(
id=creator.id,
name=creator.name,
display_name=creator.display_name,
description=creator.description,
website_url=creator.website_url,
logo_url=creator.logo_url,
)
@router.get("/models", response_model=llm_model.LlmModelsResponse)
async def list_models(
enabled_only: bool = fastapi.Query(
default=True, description="Only return enabled models"
),
):
"""
List all LLM models available to users.
Returns models from the in-memory registry cache.
Use enabled_only=true to filter to only enabled models (default).
"""
# Get models from in-memory registry
registry_models = get_enabled_models() if enabled_only else get_all_models()
# Map to API response models
models = [
llm_model.LlmModel(
slug=model.slug,
display_name=model.display_name,
description=model.description,
provider_name=model.provider_display_name,
creator=_map_creator(model.creator),
context_window=model.metadata.context_window,
max_output_tokens=model.metadata.max_output_tokens,
price_tier=model.metadata.price_tier,
is_recommended=model.is_recommended,
capabilities=model.capabilities,
costs=[
llm_model.LlmModelCost(
unit=cost.unit,
credit_cost=cost.credit_cost,
credential_provider=cost.credential_provider,
credential_id=cost.credential_id,
credential_type=cost.credential_type,
currency=cost.currency,
metadata=cost.metadata,
)
for cost in model.costs
],
)
for model in registry_models
]
return llm_model.LlmModelsResponse(models=models, total=len(models))
@router.get("/providers", response_model=llm_model.LlmProvidersResponse)
async def list_providers():
"""
List all LLM providers with their enabled models.
Groups enabled models by provider from the in-memory registry.
"""
# Get all enabled models and group by provider
registry_models = get_enabled_models()
# Group models by provider
provider_map: dict[str, list] = {}
for model in registry_models:
provider_key = model.metadata.provider
if provider_key not in provider_map:
provider_map[provider_key] = []
provider_map[provider_key].append(model)
# Build provider responses
providers = []
for provider_key, models in sorted(provider_map.items()):
# Use the first model's provider display name
display_name = models[0].provider_display_name if models else provider_key
providers.append(
llm_model.LlmProvider(
name=provider_key,
display_name=display_name,
models=[
llm_model.LlmModel(
slug=model.slug,
display_name=model.display_name,
description=model.description,
provider_name=model.provider_display_name,
creator=_map_creator(model.creator),
context_window=model.metadata.context_window,
max_output_tokens=model.metadata.max_output_tokens,
price_tier=model.metadata.price_tier,
is_recommended=model.is_recommended,
capabilities=model.capabilities,
costs=[
llm_model.LlmModelCost(
unit=cost.unit,
credit_cost=cost.credit_cost,
credential_provider=cost.credential_provider,
credential_id=cost.credential_id,
credential_type=cost.credential_type,
currency=cost.currency,
metadata=cost.metadata,
)
for cost in model.costs
],
)
for model in sorted(models, key=lambda m: m.display_name)
],
)
)
return llm_model.LlmProvidersResponse(providers=providers)

View File

@@ -1,246 +0,0 @@
#!/usr/bin/env python3
"""
AutoGPT Analytics — View Generator
====================================
Reads every .sql file in analytics/queries/ and registers it as a
CREATE OR REPLACE VIEW in the analytics schema.
Quick start (from autogpt_platform/backend/):
Step 1 — one-time setup (creates schema, role, grants):
poetry run analytics-setup
Step 2 — create / refresh all 14 analytics views:
poetry run analytics-views
Both commands auto-detect credentials from .env (DB_* vars).
Use --db-url to override.
Step 3 (optional) — enable login and set a password for the read-only
role so external tools (Supabase MCP, PostHog Data Warehouse) can connect.
The role is created as NOLOGIN, so you must grant LOGIN at the same time.
Run in Supabase SQL Editor:
ALTER ROLE analytics_readonly WITH LOGIN PASSWORD 'your-password';
Usage
-----
poetry run analytics-setup # apply setup to DB
poetry run analytics-setup --dry-run # print setup SQL only
poetry run analytics-views # apply all views to DB
poetry run analytics-views --dry-run # print all view SQL only
poetry run analytics-views --only graph_execution,retention_login_weekly
Environment variables
---------------------
DATABASE_URL Postgres connection string (checked before .env)
Notes
-----
- .env DB_* vars are read automatically as a fallback.
- Safe to re-run: uses CREATE OR REPLACE VIEW.
- Looker, PostHog Data Warehouse, and Supabase MCP all read from the
same analytics.* views — no raw tables exposed.
"""
import argparse
import os
import sys
from pathlib import Path
from urllib.parse import quote
QUERIES_DIR = Path(__file__).parent.parent / "analytics" / "queries"
ENV_FILE = Path(__file__).parent / ".env"
SCHEMA = "analytics"
SETUP_SQL = """\
-- =============================================================
-- AutoGPT Analytics Schema Setup
-- Run ONCE as the postgres superuser (e.g. via Supabase SQL Editor).
-- After this, run: poetry run analytics-views
-- =============================================================
-- 1. Create the analytics schema
CREATE SCHEMA IF NOT EXISTS analytics;
-- 2. Create the read-only role (skip if already exists)
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'analytics_readonly') THEN
CREATE ROLE analytics_readonly NOLOGIN;
END IF;
END
$$;
-- 3. Analytics schema grants only.
-- Views use security_invoker = false so they execute as their
-- owner (postgres). analytics_readonly never needs direct access
-- to the platform or auth schemas.
GRANT USAGE ON SCHEMA analytics TO analytics_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO analytics_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics
GRANT SELECT ON TABLES TO analytics_readonly;
"""
def load_db_url_from_env() -> str | None:
"""Read DB_* vars from .env and build a psycopg2 connection string."""
if not ENV_FILE.exists():
return None
env: dict[str, str] = {}
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
env[key.strip()] = value.strip().strip('"').strip("'")
host = env.get("DB_HOST", "localhost")
port = env.get("DB_PORT", "5432")
user = env.get("DB_USER", "postgres")
password = env.get("DB_PASS", "")
dbname = env.get("DB_NAME", "postgres")
if not password:
return None
return (
"postgresql://"
f"{quote(user, safe='')}:{quote(password, safe='')}"
f"@{host}:{port}/{quote(dbname, safe='')}"
)
def get_db_url(args: argparse.Namespace) -> str | None:
return args.db_url or os.environ.get("DATABASE_URL") or load_db_url_from_env()
def connect(db_url: str):
try:
import psycopg2
except ImportError:
print("psycopg2 not found. Run: poetry install", file=sys.stderr)
sys.exit(1)
return psycopg2.connect(db_url)
def run_sql(db_url: str, statements: list[tuple[str, str]]) -> None:
"""Execute a list of (label, sql) pairs in a single transaction."""
conn = connect(db_url)
conn.autocommit = False
cur = conn.cursor()
try:
for label, sql in statements:
print(f" {label} ...", end=" ")
cur.execute(sql)
print("OK")
conn.commit()
print(f"\n{len(statements)} statement(s) applied.")
except Exception as e:
conn.rollback()
print(f"\n✗ Error: {e}", file=sys.stderr)
sys.exit(1)
finally:
cur.close()
conn.close()
def build_view_sql(name: str, query_body: str) -> str:
body = query_body.strip().rstrip(";")
# security_invoker = false → view runs as its owner (postgres), not the
# caller, so analytics_readonly only needs analytics schema access.
return f"CREATE OR REPLACE VIEW {SCHEMA}.{name} WITH (security_invoker = false) AS\n{body};\n"
def load_views(only: list[str] | None = None) -> list[tuple[str, str]]:
"""Return [(label, sql)] for all views, in alphabetical order."""
files = sorted(QUERIES_DIR.glob("*.sql"))
if not files:
print(f"No .sql files found in {QUERIES_DIR}", file=sys.stderr)
sys.exit(1)
known = {f.stem for f in files}
if only:
unknown = [n for n in only if n not in known]
if unknown:
print(
f"Unknown view name(s): {', '.join(unknown)}\n"
f"Available: {', '.join(sorted(known))}",
file=sys.stderr,
)
sys.exit(1)
result = []
for f in files:
name = f.stem
if only and name not in only:
continue
result.append((f"view analytics.{name}", build_view_sql(name, f.read_text())))
return result
def no_db_url_error() -> None:
print(
"No database URL found.\n"
"Tried: --db-url, DATABASE_URL env var, and .env (DB_* vars).\n"
"Use --dry-run to just print the SQL.",
file=sys.stderr,
)
sys.exit(1)
def cmd_setup(args: argparse.Namespace) -> None:
if args.dry_run:
print(SETUP_SQL)
return
db_url = get_db_url(args)
if not db_url:
no_db_url_error()
assert db_url
print("Applying analytics setup...")
run_sql(db_url, [("schema / role / grants", SETUP_SQL)])
def cmd_views(args: argparse.Namespace) -> None:
only = [v.strip() for v in args.only.split(",")] if args.only else None
views = load_views(only=only)
if not views:
print("No matching views found.")
sys.exit(0)
if args.dry_run:
print(f"-- {len(views)} views\n")
for label, sql in views:
print(f"-- {label}")
print(sql)
return
db_url = get_db_url(args)
if not db_url:
no_db_url_error()
assert db_url
print(f"Applying {len(views)} view(s)...")
# Append grant refresh so the readonly role sees any new views
grant = f"GRANT SELECT ON ALL TABLES IN SCHEMA {SCHEMA} TO analytics_readonly;"
run_sql(db_url, views + [("grant analytics_readonly", grant)])
def main_setup() -> None:
parser = argparse.ArgumentParser(description="Apply analytics schema setup to DB")
parser.add_argument(
"--dry-run", action="store_true", help="Print SQL, don't execute"
)
parser.add_argument("--db-url", help="Postgres connection string")
cmd_setup(parser.parse_args())
def main_views() -> None:
parser = argparse.ArgumentParser(description="Apply analytics views to DB")
parser.add_argument(
"--dry-run", action="store_true", help="Print SQL, don't execute"
)
parser.add_argument("--db-url", help="Postgres connection string")
parser.add_argument("--only", help="Comma-separated view names to update")
cmd_views(parser.parse_args())
if __name__ == "__main__":
# Default: apply views (backwards-compatible with direct python invocation)
main_views()

View File

@@ -1,7 +0,0 @@
-- Remove GraphExecution foreign key from PendingHumanReview
-- The graphExecId column remains for querying, but we remove the FK constraint
-- to AgentGraphExecution since PendingHumanReview records can now be created
-- with synthetic graph_exec_ids (e.g., CoPilot direct block execution uses
-- "copilot-session-{session_id}" as graph_exec_id).
ALTER TABLE "PendingHumanReview" DROP CONSTRAINT IF EXISTS "PendingHumanReview_graphExecId_fkey";

View File

@@ -0,0 +1,148 @@
-- CreateEnum
CREATE TYPE "LlmCostUnit" AS ENUM ('RUN', 'TOKENS');
-- CreateTable
CREATE TABLE "LlmProvider" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"name" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"description" TEXT,
"defaultCredentialProvider" TEXT,
"defaultCredentialId" TEXT,
"defaultCredentialType" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "LlmProvider_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModelCreator" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"name" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"description" TEXT,
"websiteUrl" TEXT,
"logoUrl" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "LlmModelCreator_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModel" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"slug" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"description" TEXT,
"providerId" TEXT NOT NULL,
"creatorId" TEXT,
"contextWindow" INTEGER NOT NULL,
"maxOutputTokens" INTEGER,
"priceTier" INTEGER NOT NULL DEFAULT 1,
"isEnabled" BOOLEAN NOT NULL DEFAULT true,
"isRecommended" BOOLEAN NOT NULL DEFAULT false,
"supportsTools" BOOLEAN NOT NULL DEFAULT false,
"supportsJsonOutput" BOOLEAN NOT NULL DEFAULT false,
"supportsReasoning" BOOLEAN NOT NULL DEFAULT false,
"supportsParallelToolCalls" BOOLEAN NOT NULL DEFAULT false,
"capabilities" JSONB NOT NULL DEFAULT '{}',
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "LlmModel_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModelCost" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"unit" "LlmCostUnit" NOT NULL DEFAULT 'RUN',
"creditCost" INTEGER NOT NULL,
"credentialProvider" TEXT NOT NULL,
"credentialId" TEXT,
"credentialType" TEXT,
"currency" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
"llmModelId" TEXT NOT NULL,
CONSTRAINT "LlmModelCost_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "LlmModelMigration" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"sourceModelSlug" TEXT NOT NULL,
"targetModelSlug" TEXT NOT NULL,
"reason" TEXT,
"migratedNodeIds" JSONB NOT NULL DEFAULT '[]',
"nodeCount" INTEGER NOT NULL,
"customCreditCost" INTEGER,
"isReverted" BOOLEAN NOT NULL DEFAULT false,
"revertedAt" TIMESTAMP(3),
CONSTRAINT "LlmModelMigration_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "LlmProvider_name_key" ON "LlmProvider"("name");
-- CreateIndex
CREATE UNIQUE INDEX "LlmModelCreator_name_key" ON "LlmModelCreator"("name");
-- CreateIndex
CREATE UNIQUE INDEX "LlmModel_slug_key" ON "LlmModel"("slug");
-- CreateIndex
CREATE INDEX "LlmModel_providerId_isEnabled_idx" ON "LlmModel"("providerId", "isEnabled");
-- CreateIndex
CREATE INDEX "LlmModel_creatorId_idx" ON "LlmModel"("creatorId");
-- CreateIndex (partial unique for default costs - no specific credential)
CREATE UNIQUE INDEX "LlmModelCost_default_cost_key" ON "LlmModelCost"("llmModelId", "credentialProvider", "unit") WHERE "credentialId" IS NULL;
-- CreateIndex (partial unique for credential-specific costs)
CREATE UNIQUE INDEX "LlmModelCost_credential_cost_key" ON "LlmModelCost"("llmModelId", "credentialProvider", "credentialId", "unit") WHERE "credentialId" IS NOT NULL;
-- CreateIndex
CREATE INDEX "LlmModelMigration_targetModelSlug_idx" ON "LlmModelMigration"("targetModelSlug");
-- CreateIndex
CREATE INDEX "LlmModelMigration_sourceModelSlug_isReverted_idx" ON "LlmModelMigration"("sourceModelSlug", "isReverted");
-- CreateIndex (partial unique to prevent multiple active migrations per source)
CREATE UNIQUE INDEX "LlmModelMigration_active_source_key" ON "LlmModelMigration"("sourceModelSlug") WHERE "isReverted" = false;
-- AddForeignKey
ALTER TABLE "LlmModel" ADD CONSTRAINT "LlmModel_providerId_fkey" FOREIGN KEY ("providerId") REFERENCES "LlmProvider"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModel" ADD CONSTRAINT "LlmModel_creatorId_fkey" FOREIGN KEY ("creatorId") REFERENCES "LlmModelCreator"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModelCost" ADD CONSTRAINT "LlmModelCost_llmModelId_fkey" FOREIGN KEY ("llmModelId") REFERENCES "LlmModel"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModelMigration" ADD CONSTRAINT "LlmModelMigration_sourceModelSlug_fkey" FOREIGN KEY ("sourceModelSlug") REFERENCES "LlmModel"("slug") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LlmModelMigration" ADD CONSTRAINT "LlmModelMigration_targetModelSlug_fkey" FOREIGN KEY ("targetModelSlug") REFERENCES "LlmModel"("slug") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddCheckConstraints (enforce data integrity)
ALTER TABLE "LlmModel"
ADD CONSTRAINT "LlmModel_priceTier_check" CHECK ("priceTier" BETWEEN 1 AND 3);
ALTER TABLE "LlmModelCost"
ADD CONSTRAINT "LlmModelCost_creditCost_check" CHECK ("creditCost" >= 0);
ALTER TABLE "LlmModelMigration"
ADD CONSTRAINT "LlmModelMigration_nodeCount_check" CHECK ("nodeCount" >= 0),
ADD CONSTRAINT "LlmModelMigration_customCreditCost_check" CHECK ("customCreditCost" IS NULL OR "customCreditCost" >= 0);

View File

@@ -120,8 +120,6 @@ ws = "backend.ws:main"
scheduler = "backend.scheduler:main"
notification = "backend.notification:main"
executor = "backend.exec:main"
analytics-setup = "generate_views:main_setup"
analytics-views = "generate_views:main_views"
copilot-executor = "backend.copilot.executor.__main__:main"
cli = "backend.cli:main"
format = "linter:format"

View File

@@ -566,6 +566,8 @@ model AgentGraphExecution {
shareToken String? @unique
sharedAt DateTime?
PendingHumanReviews PendingHumanReview[]
@@index([agentGraphId, agentGraphVersion])
@@index([userId, isDeleted, createdAt])
@@index([createdAt])
@@ -662,7 +664,8 @@ model PendingHumanReview {
updatedAt DateTime? @updatedAt
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
@@unique([nodeExecId]) // One pending review per node execution
@@index([userId, status])
@@ -1301,3 +1304,164 @@ model OAuthRefreshToken {
@@index([userId, applicationId])
@@index([expiresAt]) // For cleanup
}
// ============================================================================
// LLM Registry Models
// ============================================================================
enum LlmCostUnit {
RUN
TOKENS
}
model LlmProvider {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
name String @unique
displayName String
description String?
defaultCredentialProvider String?
defaultCredentialId String?
defaultCredentialType String?
metadata Json @default("{}")
Models LlmModel[]
}
model LlmModel {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
slug String @unique
displayName String
description String?
providerId String
Provider LlmProvider @relation(fields: [providerId], references: [id], onDelete: Restrict)
// Creator is the organization that created/trained the model (e.g., OpenAI, Meta)
// This is distinct from the provider who hosts/serves the model (e.g., OpenRouter)
creatorId String?
Creator LlmModelCreator? @relation(fields: [creatorId], references: [id], onDelete: SetNull)
contextWindow Int
maxOutputTokens Int?
priceTier Int @default(1) // 1=cheapest, 2=medium, 3=expensive (DB constraint: 1-3)
isEnabled Boolean @default(true)
isRecommended Boolean @default(false)
// Model-specific capabilities
// These vary per model even within the same provider (e.g., Hugging Face)
// Default to false for safety - partially-seeded rows should not be assumed capable
supportsTools Boolean @default(false)
supportsJsonOutput Boolean @default(false)
supportsReasoning Boolean @default(false)
supportsParallelToolCalls Boolean @default(false)
capabilities Json @default("{}")
metadata Json @default("{}")
Costs LlmModelCost[]
SourceMigrations LlmModelMigration[] @relation("SourceMigrations")
TargetMigrations LlmModelMigration[] @relation("TargetMigrations")
@@index([providerId, isEnabled])
@@index([creatorId])
// Note: slug already has @unique which creates an implicit index
}
model LlmModelCost {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
unit LlmCostUnit @default(RUN)
creditCost Int // DB constraint: >= 0
// Provider identifier (e.g., "openai", "anthropic", "openrouter")
// Used to determine which credential system provides the API key.
// Allows different pricing for:
// - Default provider costs (WHERE credentialId IS NULL)
// - User's own API key costs (WHERE credentialId IS NOT NULL)
credentialProvider String
credentialId String?
credentialType String?
currency String?
metadata Json @default("{}")
llmModelId String
Model LlmModel @relation(fields: [llmModelId], references: [id], onDelete: Cascade)
// Note: Unique constraints are implemented as partial indexes in migration SQL:
// - One for default costs (WHERE credentialId IS NULL)
// - One for credential-specific costs (WHERE credentialId IS NOT NULL)
// This allows both provider-level defaults and credential-specific overrides
}
model LlmModelCreator {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
name String @unique // e.g., "openai", "anthropic", "meta"
displayName String // e.g., "OpenAI", "Anthropic", "Meta"
description String?
websiteUrl String? // Link to creator's website
logoUrl String? // URL to creator's logo
metadata Json @default("{}")
Models LlmModel[]
}
model LlmModelMigration {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
sourceModelSlug String // The original model that was disabled
targetModelSlug String // The model workflows were migrated to
reason String? // Why the migration happened (e.g., "Provider outage")
// FK constraints ensure slugs reference valid models
SourceModel LlmModel @relation("SourceMigrations", fields: [sourceModelSlug], references: [slug], onDelete: Restrict)
TargetModel LlmModel @relation("TargetMigrations", fields: [targetModelSlug], references: [slug], onDelete: Restrict)
// Track affected nodes as JSON array of node IDs
// Format: ["node-uuid-1", "node-uuid-2", ...]
migratedNodeIds Json @default("[]")
nodeCount Int // Number of nodes migrated (DB constraint: >= 0)
// Custom pricing override for migrated workflows during the migration period.
// Use case: When migrating users from an expensive model (e.g., GPT-4) to a cheaper
// one (e.g., GPT-3.5), you may want to temporarily maintain the original pricing
// to avoid billing surprises, or offer a discount during the transition.
//
// IMPORTANT: This field is intended for integration with the billing system.
// When billing calculates costs for nodes affected by this migration, it should
// check if customCreditCost is set and use it instead of the target model's cost.
// If null, the target model's normal cost applies.
//
// TODO: Integrate with billing system to apply this override during cost calculation.
// LIMITATION: This is a simple Int and doesn't distinguish RUN vs TOKENS pricing.
// For token-priced models, this may be ambiguous. Consider migrating to a relation
// with LlmModelCost or a dedicated override model in a follow-up PR.
customCreditCost Int? // DB constraint: >= 0 when not null
// Revert tracking
isReverted Boolean @default(false)
revertedAt DateTime?
// Note: Partial unique index in migration SQL prevents multiple active migrations per source:
// UNIQUE (sourceModelSlug) WHERE isReverted = false
@@index([targetModelSlug])
@@index([sourceModelSlug, isReverted]) // Composite index for active migration queries
}

View File

@@ -1,4 +1,3 @@
import { useMemo } from "react";
import {
Conversation,
ConversationContent,
@@ -9,7 +8,6 @@ import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner
import { FileUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { TOOL_PART_PREFIX } from "../JobStatsBar/constants";
import { TurnStatsBar } from "../JobStatsBar/TurnStatsBar";
import { CopilotPendingReviews } from "../CopilotPendingReviews/CopilotPendingReviews";
import {
buildRenderSegments,
getTurnMessages,
@@ -53,50 +51,6 @@ function renderSegments(
});
}
/**
* Extract graph_exec_id from tool outputs that need review.
* Handles both:
* - run_block ReviewRequiredResponse (has graph_exec_id directly)
* - run_agent ExecutionStartedResponse with status "REVIEW" (has execution_id)
*/
function extractGraphExecId(
messages: UIMessage<unknown, UIDataTypes, UITools>[],
): string | null {
// Scan backwards — the most recent review output has the ID
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
for (const part of msg.parts) {
if ("output" in part && part.output) {
const out =
typeof part.output === "string"
? (() => {
try {
return JSON.parse(part.output);
} catch {
return null;
}
})()
: part.output;
if (out && typeof out === "object") {
// run_block: ReviewRequiredResponse has graph_exec_id
if ("graph_exec_id" in out) {
return (out as { graph_exec_id: string }).graph_exec_id;
}
// run_agent: ExecutionStartedResponse with status "REVIEW"
if (
"execution_id" in out &&
"status" in out &&
(out as { status: string }).status === "REVIEW"
) {
return (out as { execution_id: string }).execution_id;
}
}
}
}
}
return null;
}
export function ChatMessagesContainer({
messages,
status,
@@ -106,7 +60,6 @@ export function ChatMessagesContainer({
sessionID,
}: Props) {
const lastMessage = messages[messages.length - 1];
const graphExecId = useMemo(() => extractGraphExecId(messages), [messages]);
const hasInflight = (() => {
if (lastMessage?.role !== "assistant") return false;
@@ -252,7 +205,6 @@ export function ChatMessagesContainer({
</MessageContent>
</Message>
)}
{graphExecId && <CopilotPendingReviews graphExecId={graphExecId} />}
{error && (
<details className="rounded-lg bg-red-50 p-4 text-sm text-red-700">
<summary className="cursor-pointer font-medium">

View File

@@ -130,7 +130,6 @@ export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
case "tool-get_doc_page":
return <SearchDocsTool key={key} part={part as ToolUIPart} />;
case "tool-run_block":
case "tool-continue_run_block":
return <RunBlockTool key={key} part={part as ToolUIPart} />;
case "tool-run_mcp_tool":
return <RunMCPToolComponent key={key} part={part as ToolUIPart} />;

View File

@@ -19,7 +19,6 @@ const CUSTOM_TOOL_TYPES = new Set([
"tool-search_docs",
"tool-get_doc_page",
"tool-run_block",
"tool-continue_run_block",
"tool-run_mcp_tool",
"tool-run_agent",
"tool-schedule_agent",
@@ -34,7 +33,6 @@ const INTERACTIVE_RESPONSE_TYPES: ReadonlySet<string> = new Set([
ResponseType.setup_requirements,
ResponseType.agent_details,
ResponseType.block_details,
ResponseType.review_required,
ResponseType.need_login,
ResponseType.input_validation_error,
ResponseType.agent_builder_clarification_needed,

View File

@@ -1,61 +0,0 @@
"use client";
import { useCallback } from "react";
import { PendingReviewsList } from "@/components/organisms/PendingReviewsList/PendingReviewsList";
import { useCopilotChatActions } from "../CopilotChatActionsProvider/useCopilotChatActions";
import { usePendingReviewsForExecution } from "@/hooks/usePendingReviews";
import { okData } from "@/app/api/helpers";
interface Props {
graphExecId: string;
}
/**
* Renders a single consolidated PendingReviewsList for all pending copilot
* reviews in a session — mirrors the non-copilot review page behavior.
* Works for both run_block (synthetic copilot-session-*) and run_agent (real graph exec) reviews.
*/
export function CopilotPendingReviews({ graphExecId }: Props) {
const { onSend } = useCopilotChatActions();
const { pendingReviews, refetch } = usePendingReviewsForExecution(
graphExecId,
{ enabled: !!graphExecId, refetchInterval: 2000 },
);
// Graph executions auto-resume after approval; block reviews need continue_run_block.
const isGraphExecution = !graphExecId.startsWith("copilot-session-");
const handleReviewComplete = useCallback(async () => {
// Brief delay for the server to propagate the approval
await new Promise((resolve) => setTimeout(resolve, 500));
const result = await refetch();
const remaining = okData(result.data) || [];
if (remaining.length > 0) return;
if (isGraphExecution) {
onSend(
`All pending reviews have been processed. ` +
`The agent execution will resume automatically for approved reviews. ` +
`Use view_agent_output with execution_id="${graphExecId}" to check the result.`,
);
} else {
onSend(
`All pending reviews have been processed. ` +
`For any approved reviews, call continue_run_block with the corresponding review_id to execute them. ` +
`For rejected reviews, no further action is needed.`,
);
}
}, [refetch, onSend, isGraphExecution, graphExecId]);
if (pendingReviews.length === 0) return null;
return (
<div className="py-2">
<PendingReviewsList
reviews={pendingReviews}
onReviewComplete={handleReviewComplete}
/>
</div>
);
}

View File

@@ -15,7 +15,6 @@ import {
isRunBlockBlockOutput,
isRunBlockDetailsOutput,
isRunBlockErrorOutput,
isRunBlockReviewRequiredOutput,
isRunBlockSetupRequirementsOutput,
ToolIcon,
} from "./helpers";
@@ -55,15 +54,10 @@ export function RunBlockTool({ part }: Props) {
part.state === "output-available" &&
!!output &&
!setupRequirementsOutput &&
!isRunBlockReviewRequiredOutput(output) &&
(isRunBlockBlockOutput(output) ||
isRunBlockDetailsOutput(output) ||
isRunBlockErrorOutput(output));
// Review UI is rendered at the chat level by CopilotPendingReviews,
// not inside each tool card. This matches the non-copilot flow where
// a single PendingReviewsList shows all reviews grouped together.
return (
<div className="py-2">
<div className="flex items-center gap-2 text-sm text-muted-foreground">

View File

@@ -26,18 +26,6 @@ export interface BlockDetailsResponse {
user_authenticated: boolean;
}
/** Response when a block requires human review before execution. */
export interface ReviewRequiredResponse {
type: typeof ResponseType.review_required;
message: string;
session_id?: string | null;
block_id: string;
block_name: string;
review_id: string;
graph_exec_id: string;
input_data: Record<string, unknown>;
}
export interface RunBlockInput {
block_id?: string;
block_name?: string;
@@ -48,14 +36,12 @@ export type RunBlockToolOutput =
| SetupRequirementsResponse
| BlockDetailsResponse
| BlockOutputResponse
| ReviewRequiredResponse
| ErrorResponse;
const RUN_BLOCK_OUTPUT_TYPES = new Set<string>([
ResponseType.setup_requirements,
ResponseType.block_details,
ResponseType.block_output,
ResponseType.review_required,
ResponseType.error,
]);
@@ -80,19 +66,7 @@ export function isRunBlockDetailsOutput(
export function isRunBlockBlockOutput(
output: RunBlockToolOutput,
): output is BlockOutputResponse {
return (
output.type === ResponseType.block_output ||
("block_id" in output && !("review_id" in output))
);
}
export function isRunBlockReviewRequiredOutput(
output: RunBlockToolOutput,
): output is ReviewRequiredResponse {
return (
output.type === ResponseType.review_required ||
("review_id" in output && "block_name" in output && "input_data" in output)
);
return output.type === ResponseType.block_output || "block_id" in output;
}
export function isRunBlockErrorOutput(
@@ -117,7 +91,6 @@ function parseOutput(output: unknown): RunBlockToolOutput | null {
if (typeof type === "string" && RUN_BLOCK_OUTPUT_TYPES.has(type)) {
return output as RunBlockToolOutput;
}
if ("review_id" in output) return output as ReviewRequiredResponse;
if ("block_id" in output) return output as BlockOutputResponse;
if ("block" in output) return output as BlockDetailsResponse;
if ("setup_info" in output) return output as SetupRequirementsResponse;
@@ -162,9 +135,6 @@ export function getAnimationText(part: {
if (isRunBlockSetupRequirementsOutput(output)) {
return `Setup needed for "${output.setup_info.agent_name}"`;
}
if (isRunBlockReviewRequiredOutput(output)) {
return `Review needed for "${output.block_name}"`;
}
return "Error running block";
}
case "output-error":
@@ -257,14 +227,6 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
};
}
if (isRunBlockReviewRequiredOutput(output)) {
return {
icon,
title: output.block_name,
description: "Sensitive action — awaiting review",
};
}
return {
icon: (
<WarningDiamondIcon size={32} weight="light" className="text-red-500" />

View File

@@ -9,52 +9,52 @@
/**
* Types of tool responses.
*/
export type ResponseType = (typeof ResponseType)[keyof typeof ResponseType];
export type ResponseType = typeof ResponseType[keyof typeof ResponseType];
// eslint-disable-next-line @typescript-eslint/no-redeclare
export const ResponseType = {
error: "error",
no_results: "no_results",
need_login: "need_login",
agents_found: "agents_found",
agent_details: "agent_details",
setup_requirements: "setup_requirements",
input_validation_error: "input_validation_error",
execution_started: "execution_started",
agent_output: "agent_output",
understanding_updated: "understanding_updated",
suggested_goal: "suggested_goal",
agent_builder_guide: "agent_builder_guide",
agent_builder_preview: "agent_builder_preview",
agent_builder_saved: "agent_builder_saved",
agent_builder_clarification_needed: "agent_builder_clarification_needed",
agent_builder_validation_result: "agent_builder_validation_result",
agent_builder_fix_result: "agent_builder_fix_result",
block_list: "block_list",
block_details: "block_details",
block_output: "block_output",
review_required: "review_required",
mcp_guide: "mcp_guide",
mcp_tools_discovered: "mcp_tools_discovered",
mcp_tool_output: "mcp_tool_output",
doc_search_results: "doc_search_results",
doc_page: "doc_page",
workspace_file_list: "workspace_file_list",
workspace_file_content: "workspace_file_content",
workspace_file_metadata: "workspace_file_metadata",
workspace_file_written: "workspace_file_written",
workspace_file_deleted: "workspace_file_deleted",
folder_created: "folder_created",
folder_list: "folder_list",
folder_updated: "folder_updated",
folder_moved: "folder_moved",
folder_deleted: "folder_deleted",
agents_moved_to_folder: "agents_moved_to_folder",
browser_navigate: "browser_navigate",
browser_act: "browser_act",
browser_screenshot: "browser_screenshot",
bash_exec: "bash_exec",
web_fetch: "web_fetch",
feature_request_search: "feature_request_search",
feature_request_created: "feature_request_created",
error: 'error',
no_results: 'no_results',
need_login: 'need_login',
agents_found: 'agents_found',
agent_details: 'agent_details',
setup_requirements: 'setup_requirements',
input_validation_error: 'input_validation_error',
execution_started: 'execution_started',
agent_output: 'agent_output',
understanding_updated: 'understanding_updated',
suggested_goal: 'suggested_goal',
agent_builder_guide: 'agent_builder_guide',
agent_builder_preview: 'agent_builder_preview',
agent_builder_saved: 'agent_builder_saved',
agent_builder_clarification_needed: 'agent_builder_clarification_needed',
agent_builder_validation_result: 'agent_builder_validation_result',
agent_builder_fix_result: 'agent_builder_fix_result',
block_list: 'block_list',
block_details: 'block_details',
block_output: 'block_output',
mcp_guide: 'mcp_guide',
mcp_tools_discovered: 'mcp_tools_discovered',
mcp_tool_output: 'mcp_tool_output',
doc_search_results: 'doc_search_results',
doc_page: 'doc_page',
workspace_file_list: 'workspace_file_list',
workspace_file_content: 'workspace_file_content',
workspace_file_metadata: 'workspace_file_metadata',
workspace_file_written: 'workspace_file_written',
workspace_file_deleted: 'workspace_file_deleted',
folder_created: 'folder_created',
folder_list: 'folder_list',
folder_updated: 'folder_updated',
folder_moved: 'folder_moved',
folder_deleted: 'folder_deleted',
agents_moved_to_folder: 'agents_moved_to_folder',
browser_navigate: 'browser_navigate',
browser_act: 'browser_act',
browser_screenshot: 'browser_screenshot',
bash_exec: 'bash_exec',
web_fetch: 'web_fetch',
feature_request_search: 'feature_request_search',
feature_request_created: 'feature_request_created',
} as const;

File diff suppressed because it is too large Load Diff

View File

@@ -57,25 +57,27 @@ async function handleWorkspaceDownload(
);
}
// Fully buffer the response before forwarding. Passing response.body as a
// ReadableStream causes silent truncation in Next.js / Vercel — the last
// ~10 KB of larger files are dropped, corrupting PNGs and truncating CSVs.
const buffer = await response.arrayBuffer();
// Get the content type from the backend response
const contentType =
response.headers.get("Content-Type") || "application/octet-stream";
const contentDisposition = response.headers.get("Content-Disposition");
// Stream the response body
const responseHeaders: Record<string, string> = {
"Content-Type": contentType,
"Content-Length": String(buffer.byteLength),
};
if (contentDisposition) {
responseHeaders["Content-Disposition"] = contentDisposition;
}
return new NextResponse(buffer, {
const contentLength = response.headers.get("Content-Length");
if (contentLength) {
responseHeaders["Content-Length"] = contentLength;
}
// Stream the response body directly instead of buffering in memory
return new NextResponse(response.body, {
status: 200,
headers: responseHeaders,
});