Compare commits

..

5 Commits

Author SHA1 Message Date
Zamil Majdy
4f652cb978 Merge branch 'dev' into feat/execution-data 2025-08-29 06:44:13 +04:00
Zamil Majdy
279552a2a3 fix(backend): resolve foreign key constraints and connection errors in execution tests
## Problem
ExecutionDataClient integration tests were failing with foreign key constraint
violations and "connection refused" errors that caused tests to hang and fail
after service shutdown.

## Root Cause
1. Tests used hardcoded IDs (test_graph_exec_id) that didn't exist in database
2. @non_blocking_persist decorator created background threads that continued
   database calls after test services shut down
3. Foreign key constraints failed: AgentNodeExecution_agentGraphExecutionId_fkey

## Solution
1. **Fixed Foreign Key Issues**: Create proper database records in creation tests
   - User → AgentGraph → AgentGraphExecution relationship
   - Use correct enum types (AgentExecutionStatus.RUNNING vs "RUNNING")

2. **Eliminated Connection Errors**: Mock all database operations in data tests
   - Mock get_database_manager_client/async_client
   - Mock get_execution_event_bus
   - Disable @non_blocking_persist decorator to prevent background calls

3. **Clean Test Isolation**: Ensure tests don't leak database connections

## Test Results
-  1005 passed, 88 skipped - 100% GREEN
-  No connection refused errors
-  Fast execution (~53s vs hanging)
-  All ExecutionDataClient and ExecutionCreation tests pass

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-29 08:01:54 +07:00
Zamil Majdy
fb6ac1d6ca refactor(backend/executor): Clean up debug prints and unnecessary comments
## Summary
- Removed all debug print statements from execution_cache.py
- Cleaned up redundant and obvious comments across all executor files
- Simplified verbose docstrings to be more concise
- Removed implementation detail comments that don't add value

## Changes Made

### ExecutionCache
- Removed 4 debug print statements
- Simplified update_graph_start_time docstring
- Removed unnecessary comment about graph status caching

### ExecutionData
- Removed redundant inline comments
- Simplified method docstrings
- Removed obvious comments about error handling

### Test Files
- Simplified module-level docstrings
- Removed fixture implementation comments
- Cleaned up test setup comments
- Removed obvious section dividers

## Result
Cleaner, more professional code without clutter while maintaining functionality.
All tests still pass: 18 passed (execution tests), 1005 passed (full suite).

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-29 05:42:38 +07:00
Zamil Majdy
9db15bff02 fix(backend/executor): Fix race conditions and achieve 100% GREEN test suite
## Summary
- Fixed critical race conditions in ExecutionDataClient execution reuse logic
- Implemented per-key locking mechanism to prevent deadlocks
- Fixed sync/async mixing issues that caused timeouts
- Fixed test mocking issues that caused pydantic validation errors

## Changes Made

### ExecutionCache
- Added proper debug logging for execution finding
- Fixed update_graph_start_time documentation to clarify cache vs DB responsibilities
- Maintained OrderedDict for proper execution ordering

### ExecutionData
- Implemented per-key locking to prevent deadlocks between different operations
- Fixed sync/async mixing in upsert_execution_input
- Converted mock objects to strings to prevent pydantic validation errors
- Redesigned upsert logic to properly handle execution reuse without RuntimeError

### Tests
- Created comprehensive execution_creation_test with 3 test methods
- Fixed execution_data_test graph stats operations test
- Simplified tests to focus on cache behavior rather than background DB persistence
- Fixed mock setup to properly track created executions

## Test Results
 **1005 passed, 88 skipped, 0 failed**
- execution_creation_test: All 3 tests pass
- execution_data_test: All 15 tests pass
- Full test suite: 100% GREEN

## Impact
- Eliminates race conditions in node execution creation
- Prevents duplicate executions for same inputs
- Ensures proper execution reuse logic
- No more foreign key constraint violations
- Stable and reliable test suite

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-29 05:24:08 +07:00
Zamil Majdy
db4b94e0dc feat: Make local-first db-eventual-consistent on execution manager code 2025-08-28 18:34:40 +07:00
186 changed files with 4071 additions and 9589 deletions

View File

@@ -1,97 +0,0 @@
name: Auto Fix CI Failures
on:
workflow_run:
workflows: ["CI"]
types:
- completed
permissions:
contents: write
pull-requests: write
actions: read
issues: write
id-token: write # Required for OIDC token exchange
jobs:
auto-fix:
if: |
github.event.workflow_run.conclusion == 'failure' &&
github.event.workflow_run.pull_requests[0] &&
!startsWith(github.event.workflow_run.head_branch, 'claude-auto-fix-ci-')
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.workflow_run.head_branch }}
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup git identity
run: |
git config --global user.email "claude[bot]@users.noreply.github.com"
git config --global user.name "claude[bot]"
- name: Create fix branch
id: branch
run: |
BRANCH_NAME="claude-auto-fix-ci-${{ github.event.workflow_run.head_branch }}-${{ github.run_id }}"
git checkout -b "$BRANCH_NAME"
echo "branch_name=$BRANCH_NAME" >> $GITHUB_OUTPUT
- name: Get CI failure details
id: failure_details
uses: actions/github-script@v7
with:
script: |
const run = await github.rest.actions.getWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: ${{ github.event.workflow_run.id }}
});
const jobs = await github.rest.actions.listJobsForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: ${{ github.event.workflow_run.id }}
});
const failedJobs = jobs.data.jobs.filter(job => job.conclusion === 'failure');
let errorLogs = [];
for (const job of failedJobs) {
const logs = await github.rest.actions.downloadJobLogsForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
job_id: job.id
});
errorLogs.push({
jobName: job.name,
logs: logs.data
});
}
return {
runUrl: run.data.html_url,
failedJobs: failedJobs.map(j => j.name),
errorLogs: errorLogs
};
- name: Fix CI failures with Claude
id: claude
uses: anthropics/claude-code-action@v1
with:
prompt: |
/fix-ci
Failed CI Run: ${{ fromJSON(steps.failure_details.outputs.result).runUrl }}
Failed Jobs: ${{ join(fromJSON(steps.failure_details.outputs.result).failedJobs, ', ') }}
PR Number: ${{ github.event.workflow_run.pull_requests[0].number }}
Branch Name: ${{ steps.branch.outputs.branch_name }}
Base Branch: ${{ github.event.workflow_run.head_branch }}
Repository: ${{ github.repository }}
Error logs:
${{ toJSON(fromJSON(steps.failure_details.outputs.result).errorLogs) }}
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
claude_args: "--allowedTools 'Edit,MultiEdit,Write,Read,Glob,Grep,LS,Bash(git:*),Bash(bun:*),Bash(npm:*),Bash(npx:*),Bash(gh:*)'"

View File

@@ -1,379 +0,0 @@
# Claude Dependabot PR Review Workflow
#
# This workflow automatically runs Claude analysis on Dependabot PRs to:
# - Identify dependency changes and their versions
# - Look up changelogs for updated packages
# - Assess breaking changes and security impacts
# - Provide actionable recommendations for the development team
#
# Triggered on: Dependabot PRs (opened, synchronize)
# Requirements: ANTHROPIC_API_KEY secret must be configured
name: Claude Dependabot PR Review
on:
pull_request:
types: [opened, synchronize]
jobs:
dependabot-review:
# Only run on Dependabot PRs
if: github.actor == 'dependabot[bot]'
runs-on: ubuntu-latest
timeout-minutes: 30
permissions:
contents: write
pull-requests: read
issues: read
id-token: write
actions: read # Required for CI access
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 1
# Backend Python/Poetry setup (mirrors platform-backend-ci.yml)
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11" # Use standard version matching CI
- name: Set up Python dependency cache
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: poetry-${{ runner.os }}-${{ hashFiles('autogpt_platform/backend/poetry.lock') }}
- name: Install Poetry
run: |
# Extract Poetry version from backend/poetry.lock (matches CI)
cd autogpt_platform/backend
HEAD_POETRY_VERSION=$(python3 ../../.github/workflows/scripts/get_package_version_from_lockfile.py poetry)
echo "Found Poetry version ${HEAD_POETRY_VERSION} in backend/poetry.lock"
# Install Poetry
curl -sSL https://install.python-poetry.org | POETRY_VERSION=$HEAD_POETRY_VERSION python3 -
# Add Poetry to PATH
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Check poetry.lock
working-directory: autogpt_platform/backend
run: |
poetry lock
if ! git diff --quiet --ignore-matching-lines="^# " poetry.lock; then
echo "Warning: poetry.lock not up to date, but continuing for setup"
git checkout poetry.lock # Reset for clean setup
fi
- name: Install Python dependencies
working-directory: autogpt_platform/backend
run: poetry install
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "21"
- name: Enable corepack
run: corepack enable
- name: Set pnpm store directory
run: |
pnpm config set store-dir ~/.pnpm-store
echo "PNPM_HOME=$HOME/.pnpm-store" >> $GITHUB_ENV
- name: Cache frontend dependencies
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install JavaScript dependencies
working-directory: autogpt_platform/frontend
run: pnpm install --frozen-lockfile
# Install Playwright browsers for frontend testing
# NOTE: Disabled to save ~1 minute of setup time. Re-enable if Copilot needs browser automation (e.g., for MCP)
# - name: Install Playwright browsers
# working-directory: autogpt_platform/frontend
# run: pnpm playwright install --with-deps chromium
# Docker setup for development environment
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Copy default environment files
working-directory: autogpt_platform
run: |
# Copy default environment files for development
cp .env.default .env
cp backend/.env.default backend/.env
cp frontend/.env.default frontend/.env
# Phase 1: Cache and load Docker images for faster setup
- name: Set up Docker image cache
id: docker-cache
uses: actions/cache@v4
with:
path: ~/docker-cache
# Use a versioned key for cache invalidation when image list changes
key: docker-images-v2-${{ runner.os }}-${{ hashFiles('.github/workflows/copilot-setup-steps.yml') }}
restore-keys: |
docker-images-v2-${{ runner.os }}-
docker-images-v1-${{ runner.os }}-
- name: Load or pull Docker images
working-directory: autogpt_platform
run: |
mkdir -p ~/docker-cache
# Define image list for easy maintenance
IMAGES=(
"redis:latest"
"rabbitmq:management"
"clamav/clamav-debian:latest"
"busybox:latest"
"kong:2.8.1"
"supabase/gotrue:v2.170.0"
"supabase/postgres:15.8.1.049"
"supabase/postgres-meta:v0.86.1"
"supabase/studio:20250224-d10db0f"
)
# Check if any cached tar files exist (more reliable than cache-hit)
if ls ~/docker-cache/*.tar 1> /dev/null 2>&1; then
echo "Docker cache found, loading images in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
if [ -f ~/docker-cache/${filename}.tar ]; then
echo "Loading $image..."
docker load -i ~/docker-cache/${filename}.tar || echo "Warning: Failed to load $image from cache" &
fi
done
wait
echo "All cached images loaded"
else
echo "No Docker cache found, pulling images in parallel..."
# Pull all images in parallel
for image in "${IMAGES[@]}"; do
docker pull "$image" &
done
wait
# Only save cache on main branches (not PRs) to avoid cache pollution
if [[ "${{ github.ref }}" == "refs/heads/master" ]] || [[ "${{ github.ref }}" == "refs/heads/dev" ]]; then
echo "Saving Docker images to cache in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
echo "Saving $image..."
docker save -o ~/docker-cache/${filename}.tar "$image" || echo "Warning: Failed to save $image" &
done
wait
echo "Docker image cache saved"
else
echo "Skipping cache save for PR/feature branch"
fi
fi
echo "Docker images ready for use"
# Phase 2: Build migrate service with GitHub Actions cache
- name: Build migrate Docker image with cache
working-directory: autogpt_platform
run: |
# Build the migrate image with buildx for GHA caching
docker buildx build \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--target migrate \
--tag autogpt_platform-migrate:latest \
--load \
-f backend/Dockerfile \
..
# Start services using pre-built images
- name: Start Docker services for development
working-directory: autogpt_platform
run: |
# Start essential services (migrate image already built with correct tag)
docker compose --profile local up deps --no-build --detach
echo "Waiting for services to be ready..."
# Wait for database to be ready
echo "Checking database readiness..."
timeout 30 sh -c 'until docker compose exec -T db pg_isready -U postgres 2>/dev/null; do
echo " Waiting for database..."
sleep 2
done' && echo "✅ Database is ready" || echo "⚠️ Database ready check timeout after 30s, continuing..."
# Check migrate service status
echo "Checking migration status..."
docker compose ps migrate || echo " Migrate service not visible in ps output"
# Wait for migrate service to complete
echo "Waiting for migrations to complete..."
timeout 30 bash -c '
ATTEMPTS=0
while [ $ATTEMPTS -lt 15 ]; do
ATTEMPTS=$((ATTEMPTS + 1))
# Check using docker directly (more reliable than docker compose ps)
CONTAINER_STATUS=$(docker ps -a --filter "label=com.docker.compose.service=migrate" --format "{{.Status}}" | head -1)
if [ -z "$CONTAINER_STATUS" ]; then
echo " Attempt $ATTEMPTS: Migrate container not found yet..."
elif echo "$CONTAINER_STATUS" | grep -q "Exited (0)"; then
echo "✅ Migrations completed successfully"
docker compose logs migrate --tail=5 2>/dev/null || true
exit 0
elif echo "$CONTAINER_STATUS" | grep -q "Exited ([1-9]"; then
EXIT_CODE=$(echo "$CONTAINER_STATUS" | grep -oE "Exited \([0-9]+\)" | grep -oE "[0-9]+")
echo "❌ Migrations failed with exit code: $EXIT_CODE"
echo "Migration logs:"
docker compose logs migrate --tail=20 2>/dev/null || true
exit 1
elif echo "$CONTAINER_STATUS" | grep -q "Up"; then
echo " Attempt $ATTEMPTS: Migrate container is running... ($CONTAINER_STATUS)"
else
echo " Attempt $ATTEMPTS: Migrate container status: $CONTAINER_STATUS"
fi
sleep 2
done
echo "⚠️ Timeout: Could not determine migration status after 30 seconds"
echo "Final container check:"
docker ps -a --filter "label=com.docker.compose.service=migrate" || true
echo "Migration logs (if available):"
docker compose logs migrate --tail=10 2>/dev/null || echo " No logs available"
' || echo "⚠️ Migration check completed with warnings, continuing..."
# Brief wait for other services to stabilize
echo "Waiting 5 seconds for other services to stabilize..."
sleep 5
# Verify installations and provide environment info
- name: Verify setup and show environment info
run: |
echo "=== Python Setup ==="
python --version
poetry --version
echo "=== Node.js Setup ==="
node --version
pnpm --version
echo "=== Additional Tools ==="
docker --version
docker compose version
gh --version || true
echo "=== Services Status ==="
cd autogpt_platform
docker compose ps || true
echo "=== Backend Dependencies ==="
cd backend
poetry show | head -10 || true
echo "=== Frontend Dependencies ==="
cd ../frontend
pnpm list --depth=0 | head -10 || true
echo "=== Environment Files ==="
ls -la ../.env* || true
ls -la .env* || true
ls -la ../backend/.env* || true
echo "✅ AutoGPT Platform development environment setup complete!"
echo "🚀 Ready for development with Docker services running"
echo "📝 Backend server: poetry run serve (port 8000)"
echo "🌐 Frontend server: pnpm dev (port 3000)"
- name: Run Claude Dependabot Analysis
id: claude_review
uses: anthropics/claude-code-action@v1
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
claude_args: |
--allowedTools "Bash(npm:*),Bash(pnpm:*),Bash(poetry:*),Bash(git:*),Edit,Replace,NotebookEditCell,mcp__github_inline_comment__create_inline_comment,Bash(gh pr comment:*), Bash(gh pr diff:*), Bash(gh pr view:*)"
custom_system_prompt: |
You are Claude, an AI assistant specialized in reviewing Dependabot dependency update PRs.
Your primary tasks are:
1. **Analyze the dependency changes** in this Dependabot PR
2. **Look up changelogs** for all updated dependencies to understand what changed
3. **Identify breaking changes** and assess potential impact on the AutoGPT codebase
4. **Provide actionable recommendations** for the development team
## Analysis Process:
1. **Identify Changed Dependencies**:
- Use git diff to see what dependencies were updated
- Parse package.json, poetry.lock, requirements files, etc.
- List all package versions: old → new
2. **Changelog Research**:
- For each updated dependency, look up its changelog/release notes
- Use WebFetch to access GitHub releases, NPM package pages, PyPI project pages. The pr should also have some details
- Focus on versions between the old and new versions
- Identify: breaking changes, deprecations, security fixes, new features
3. **Breaking Change Assessment**:
- Categorize changes: BREAKING, MAJOR, MINOR, PATCH, SECURITY
- Assess impact on AutoGPT's usage patterns
- Check if AutoGPT uses affected APIs/features
- Look for migration guides or upgrade instructions
4. **Codebase Impact Analysis**:
- Search the AutoGPT codebase for usage of changed APIs
- Identify files that might be affected by breaking changes
- Check test files for deprecated usage patterns
- Look for configuration changes needed
## Output Format:
Provide a comprehensive review comment with:
### 🔍 Dependency Analysis Summary
- List of updated packages with version changes
- Overall risk assessment (LOW/MEDIUM/HIGH)
### 📋 Detailed Changelog Review
For each updated dependency:
- **Package**: name (old_version → new_version)
- **Changes**: Summary of key changes
- **Breaking Changes**: List any breaking changes
- **Security Fixes**: Note security improvements
- **Migration Notes**: Any upgrade steps needed
### ⚠️ Impact Assessment
- **Breaking Changes Found**: Yes/No with details
- **Affected Files**: List AutoGPT files that may need updates
- **Test Impact**: Any tests that may need updating
- **Configuration Changes**: Required config updates
### 🛠️ Recommendations
- **Action Required**: What the team should do
- **Testing Focus**: Areas to test thoroughly
- **Follow-up Tasks**: Any additional work needed
- **Merge Recommendation**: APPROVE/REVIEW_NEEDED/HOLD
### 📚 Useful Links
- Links to relevant changelogs, migration guides, documentation
Be thorough but concise. Focus on actionable insights that help the development team make informed decisions about the dependency updates.

View File

@@ -30,295 +30,18 @@ jobs:
github.event.issue.author_association == 'COLLABORATOR'
)
runs-on: ubuntu-latest
timeout-minutes: 45
permissions:
contents: write
contents: read
pull-requests: read
issues: read
id-token: write
actions: read # Required for CI access
steps:
- name: Checkout code
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
# Backend Python/Poetry setup (mirrors platform-backend-ci.yml)
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11" # Use standard version matching CI
- name: Set up Python dependency cache
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: poetry-${{ runner.os }}-${{ hashFiles('autogpt_platform/backend/poetry.lock') }}
- name: Install Poetry
run: |
# Extract Poetry version from backend/poetry.lock (matches CI)
cd autogpt_platform/backend
HEAD_POETRY_VERSION=$(python3 ../../.github/workflows/scripts/get_package_version_from_lockfile.py poetry)
echo "Found Poetry version ${HEAD_POETRY_VERSION} in backend/poetry.lock"
# Install Poetry
curl -sSL https://install.python-poetry.org | POETRY_VERSION=$HEAD_POETRY_VERSION python3 -
# Add Poetry to PATH
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Check poetry.lock
working-directory: autogpt_platform/backend
run: |
poetry lock
if ! git diff --quiet --ignore-matching-lines="^# " poetry.lock; then
echo "Warning: poetry.lock not up to date, but continuing for setup"
git checkout poetry.lock # Reset for clean setup
fi
- name: Install Python dependencies
working-directory: autogpt_platform/backend
run: poetry install
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "21"
- name: Enable corepack
run: corepack enable
- name: Set pnpm store directory
run: |
pnpm config set store-dir ~/.pnpm-store
echo "PNPM_HOME=$HOME/.pnpm-store" >> $GITHUB_ENV
- name: Cache frontend dependencies
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install JavaScript dependencies
working-directory: autogpt_platform/frontend
run: pnpm install --frozen-lockfile
# Install Playwright browsers for frontend testing
# NOTE: Disabled to save ~1 minute of setup time. Re-enable if Copilot needs browser automation (e.g., for MCP)
# - name: Install Playwright browsers
# working-directory: autogpt_platform/frontend
# run: pnpm playwright install --with-deps chromium
# Docker setup for development environment
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Copy default environment files
working-directory: autogpt_platform
run: |
# Copy default environment files for development
cp .env.default .env
cp backend/.env.default backend/.env
cp frontend/.env.default frontend/.env
# Phase 1: Cache and load Docker images for faster setup
- name: Set up Docker image cache
id: docker-cache
uses: actions/cache@v4
with:
path: ~/docker-cache
# Use a versioned key for cache invalidation when image list changes
key: docker-images-v2-${{ runner.os }}-${{ hashFiles('.github/workflows/copilot-setup-steps.yml') }}
restore-keys: |
docker-images-v2-${{ runner.os }}-
docker-images-v1-${{ runner.os }}-
- name: Load or pull Docker images
working-directory: autogpt_platform
run: |
mkdir -p ~/docker-cache
# Define image list for easy maintenance
IMAGES=(
"redis:latest"
"rabbitmq:management"
"clamav/clamav-debian:latest"
"busybox:latest"
"kong:2.8.1"
"supabase/gotrue:v2.170.0"
"supabase/postgres:15.8.1.049"
"supabase/postgres-meta:v0.86.1"
"supabase/studio:20250224-d10db0f"
)
# Check if any cached tar files exist (more reliable than cache-hit)
if ls ~/docker-cache/*.tar 1> /dev/null 2>&1; then
echo "Docker cache found, loading images in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
if [ -f ~/docker-cache/${filename}.tar ]; then
echo "Loading $image..."
docker load -i ~/docker-cache/${filename}.tar || echo "Warning: Failed to load $image from cache" &
fi
done
wait
echo "All cached images loaded"
else
echo "No Docker cache found, pulling images in parallel..."
# Pull all images in parallel
for image in "${IMAGES[@]}"; do
docker pull "$image" &
done
wait
# Only save cache on main branches (not PRs) to avoid cache pollution
if [[ "${{ github.ref }}" == "refs/heads/master" ]] || [[ "${{ github.ref }}" == "refs/heads/dev" ]]; then
echo "Saving Docker images to cache in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
echo "Saving $image..."
docker save -o ~/docker-cache/${filename}.tar "$image" || echo "Warning: Failed to save $image" &
done
wait
echo "Docker image cache saved"
else
echo "Skipping cache save for PR/feature branch"
fi
fi
echo "Docker images ready for use"
# Phase 2: Build migrate service with GitHub Actions cache
- name: Build migrate Docker image with cache
working-directory: autogpt_platform
run: |
# Build the migrate image with buildx for GHA caching
docker buildx build \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--target migrate \
--tag autogpt_platform-migrate:latest \
--load \
-f backend/Dockerfile \
..
# Start services using pre-built images
- name: Start Docker services for development
working-directory: autogpt_platform
run: |
# Start essential services (migrate image already built with correct tag)
docker compose --profile local up deps --no-build --detach
echo "Waiting for services to be ready..."
# Wait for database to be ready
echo "Checking database readiness..."
timeout 30 sh -c 'until docker compose exec -T db pg_isready -U postgres 2>/dev/null; do
echo " Waiting for database..."
sleep 2
done' && echo "✅ Database is ready" || echo "⚠️ Database ready check timeout after 30s, continuing..."
# Check migrate service status
echo "Checking migration status..."
docker compose ps migrate || echo " Migrate service not visible in ps output"
# Wait for migrate service to complete
echo "Waiting for migrations to complete..."
timeout 30 bash -c '
ATTEMPTS=0
while [ $ATTEMPTS -lt 15 ]; do
ATTEMPTS=$((ATTEMPTS + 1))
# Check using docker directly (more reliable than docker compose ps)
CONTAINER_STATUS=$(docker ps -a --filter "label=com.docker.compose.service=migrate" --format "{{.Status}}" | head -1)
if [ -z "$CONTAINER_STATUS" ]; then
echo " Attempt $ATTEMPTS: Migrate container not found yet..."
elif echo "$CONTAINER_STATUS" | grep -q "Exited (0)"; then
echo "✅ Migrations completed successfully"
docker compose logs migrate --tail=5 2>/dev/null || true
exit 0
elif echo "$CONTAINER_STATUS" | grep -q "Exited ([1-9]"; then
EXIT_CODE=$(echo "$CONTAINER_STATUS" | grep -oE "Exited \([0-9]+\)" | grep -oE "[0-9]+")
echo "❌ Migrations failed with exit code: $EXIT_CODE"
echo "Migration logs:"
docker compose logs migrate --tail=20 2>/dev/null || true
exit 1
elif echo "$CONTAINER_STATUS" | grep -q "Up"; then
echo " Attempt $ATTEMPTS: Migrate container is running... ($CONTAINER_STATUS)"
else
echo " Attempt $ATTEMPTS: Migrate container status: $CONTAINER_STATUS"
fi
sleep 2
done
echo "⚠️ Timeout: Could not determine migration status after 30 seconds"
echo "Final container check:"
docker ps -a --filter "label=com.docker.compose.service=migrate" || true
echo "Migration logs (if available):"
docker compose logs migrate --tail=10 2>/dev/null || echo " No logs available"
' || echo "⚠️ Migration check completed with warnings, continuing..."
# Brief wait for other services to stabilize
echo "Waiting 5 seconds for other services to stabilize..."
sleep 5
# Verify installations and provide environment info
- name: Verify setup and show environment info
run: |
echo "=== Python Setup ==="
python --version
poetry --version
echo "=== Node.js Setup ==="
node --version
pnpm --version
echo "=== Additional Tools ==="
docker --version
docker compose version
gh --version || true
echo "=== Services Status ==="
cd autogpt_platform
docker compose ps || true
echo "=== Backend Dependencies ==="
cd backend
poetry show | head -10 || true
echo "=== Frontend Dependencies ==="
cd ../frontend
pnpm list --depth=0 | head -10 || true
echo "=== Environment Files ==="
ls -la ../.env* || true
ls -la .env* || true
ls -la ../backend/.env* || true
echo "✅ AutoGPT Platform development environment setup complete!"
echo "🚀 Ready for development with Docker services running"
echo "📝 Backend server: poetry run serve (port 8000)"
echo "🌐 Frontend server: pnpm dev (port 3000)"
- name: Run Claude Code
id: claude
uses: anthropics/claude-code-action@v1
uses: anthropics/claude-code-action@beta
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
claude_args: |
--allowedTools "Bash(npm:*),Bash(pnpm:*),Bash(poetry:*),Bash(git:*),Edit,Replace,NotebookEditCell,mcp__github_inline_comment__create_inline_comment,Bash(gh pr comment:*), Bash(gh pr diff:*), Bash(gh pr view:*)"
additional_permissions: |
actions: read

View File

@@ -160,7 +160,7 @@ jobs:
- name: Run docker compose
run: |
NEXT_PUBLIC_PW_TEST=true docker compose -f ../docker-compose.yml up -d
docker compose -f ../docker-compose.yml up -d
env:
DOCKER_BUILDKIT: 1
BUILDX_CACHE_FROM: type=local,src=/tmp/.buildx-cache

View File

@@ -61,27 +61,24 @@ poetry run pytest path/to/test.py --snapshot-update
```bash
# Install dependencies
cd frontend && pnpm i
cd frontend && npm install
# Start development server
pnpm dev
npm run dev
# Run E2E tests
pnpm test
npm run test
# Run Storybook for component development
pnpm storybook
npm run storybook
# Build production
pnpm build
npm run build
# Type checking
pnpm types
npm run types
```
We have a components library in autogpt_platform/frontend/src/components/atoms that should be used when adding new pages and components.
## Architecture Overview
### Backend Architecture

View File

@@ -1534,31 +1534,31 @@ pyasn1 = ">=0.1.3"
[[package]]
name = "ruff"
version = "0.12.11"
version = "0.12.9"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "ruff-0.12.11-py3-none-linux_armv6l.whl", hash = "sha256:93fce71e1cac3a8bf9200e63a38ac5c078f3b6baebffb74ba5274fb2ab276065"},
{file = "ruff-0.12.11-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b8e33ac7b28c772440afa80cebb972ffd823621ded90404f29e5ab6d1e2d4b93"},
{file = "ruff-0.12.11-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d69fb9d4937aa19adb2e9f058bc4fbfe986c2040acb1a4a9747734834eaa0bfd"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:411954eca8464595077a93e580e2918d0a01a19317af0a72132283e28ae21bee"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:6a2c0a2e1a450f387bf2c6237c727dd22191ae8c00e448e0672d624b2bbd7fb0"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8ca4c3a7f937725fd2413c0e884b5248a19369ab9bdd850b5781348ba283f644"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4d1df0098124006f6a66ecf3581a7f7e754c4df7644b2e6704cd7ca80ff95211"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5a8dd5f230efc99a24ace3b77e3555d3fbc0343aeed3fc84c8d89e75ab2ff793"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4dc75533039d0ed04cd33fb8ca9ac9620b99672fe7ff1533b6402206901c34ee"},
{file = "ruff-0.12.11-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4fc58f9266d62c6eccc75261a665f26b4ef64840887fc6cbc552ce5b29f96cc8"},
{file = "ruff-0.12.11-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:5a0113bd6eafd545146440225fe60b4e9489f59eb5f5f107acd715ba5f0b3d2f"},
{file = "ruff-0.12.11-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:0d737b4059d66295c3ea5720e6efc152623bb83fde5444209b69cd33a53e2000"},
{file = "ruff-0.12.11-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:916fc5defee32dbc1fc1650b576a8fed68f5e8256e2180d4d9855aea43d6aab2"},
{file = "ruff-0.12.11-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c984f07d7adb42d3ded5be894fb4007f30f82c87559438b4879fe7aa08c62b39"},
{file = "ruff-0.12.11-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:e07fbb89f2e9249f219d88331c833860489b49cdf4b032b8e4432e9b13e8a4b9"},
{file = "ruff-0.12.11-py3-none-win32.whl", hash = "sha256:c792e8f597c9c756e9bcd4d87cf407a00b60af77078c96f7b6366ea2ce9ba9d3"},
{file = "ruff-0.12.11-py3-none-win_amd64.whl", hash = "sha256:a3283325960307915b6deb3576b96919ee89432ebd9c48771ca12ee8afe4a0fd"},
{file = "ruff-0.12.11-py3-none-win_arm64.whl", hash = "sha256:bae4d6e6a2676f8fb0f98b74594a048bae1b944aab17e9f5d504062303c6dbea"},
{file = "ruff-0.12.11.tar.gz", hash = "sha256:c6b09ae8426a65bbee5425b9d0b82796dbb07cb1af045743c79bfb163001165d"},
{file = "ruff-0.12.9-py3-none-linux_armv6l.whl", hash = "sha256:fcebc6c79fcae3f220d05585229463621f5dbf24d79fdc4936d9302e177cfa3e"},
{file = "ruff-0.12.9-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:aed9d15f8c5755c0e74467731a007fcad41f19bcce41cd75f768bbd687f8535f"},
{file = "ruff-0.12.9-py3-none-macosx_11_0_arm64.whl", hash = "sha256:5b15ea354c6ff0d7423814ba6d44be2807644d0c05e9ed60caca87e963e93f70"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d596c2d0393c2502eaabfef723bd74ca35348a8dac4267d18a94910087807c53"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:1b15599931a1a7a03c388b9c5df1bfa62be7ede6eb7ef753b272381f39c3d0ff"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3d02faa2977fb6f3f32ddb7828e212b7dd499c59eb896ae6c03ea5c303575756"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:17d5b6b0b3a25259b69ebcba87908496e6830e03acfb929ef9fd4c58675fa2ea"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:72db7521860e246adbb43f6ef464dd2a532ef2ef1f5dd0d470455b8d9f1773e0"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a03242c1522b4e0885af63320ad754d53983c9599157ee33e77d748363c561ce"},
{file = "ruff-0.12.9-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9fc83e4e9751e6c13b5046d7162f205d0a7bac5840183c5beebf824b08a27340"},
{file = "ruff-0.12.9-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:881465ed56ba4dd26a691954650de6ad389a2d1fdb130fe51ff18a25639fe4bb"},
{file = "ruff-0.12.9-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:43f07a3ccfc62cdb4d3a3348bf0588358a66da756aa113e071b8ca8c3b9826af"},
{file = "ruff-0.12.9-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:07adb221c54b6bba24387911e5734357f042e5669fa5718920ee728aba3cbadc"},
{file = "ruff-0.12.9-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f5cd34fabfdea3933ab85d72359f118035882a01bff15bd1d2b15261d85d5f66"},
{file = "ruff-0.12.9-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:f6be1d2ca0686c54564da8e7ee9e25f93bdd6868263805f8c0b8fc6a449db6d7"},
{file = "ruff-0.12.9-py3-none-win32.whl", hash = "sha256:cc7a37bd2509974379d0115cc5608a1a4a6c4bff1b452ea69db83c8855d53f93"},
{file = "ruff-0.12.9-py3-none-win_amd64.whl", hash = "sha256:6fb15b1977309741d7d098c8a3cb7a30bc112760a00fb6efb7abc85f00ba5908"},
{file = "ruff-0.12.9-py3-none-win_arm64.whl", hash = "sha256:63c8c819739d86b96d500cce885956a1a48ab056bbcbc61b747ad494b2485089"},
{file = "ruff-0.12.9.tar.gz", hash = "sha256:fbd94b2e3c623f659962934e52c2bea6fc6da11f667a427a368adaf3af2c866a"},
]
[[package]]
@@ -1897,4 +1897,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
content-hash = "d841f62f95180f6ad63ce82ed8e62aa201b9bf89242cc9299ae0f26ff1f72136"
content-hash = "ef7818fba061cea2841c6d7ca4852acde83e4f73b32fca1315e58660002bb0d0"

View File

@@ -21,7 +21,7 @@ supabase = "^2.16.0"
uvicorn = "^0.35.0"
[tool.poetry.group.dev.dependencies]
ruff = "^0.12.11"
ruff = "^0.12.9"
pytest = "^8.4.1"
pytest-asyncio = "^1.1.0"
pytest-mock = "^3.14.1"

View File

@@ -1,6 +1,8 @@
import logging
from typing import Any, Optional
from pydantic import JsonValue
from backend.data.block import (
Block,
BlockCategory,
@@ -10,7 +12,7 @@ from backend.data.block import (
BlockType,
get_block,
)
from backend.data.execution import ExecutionStatus, NodesInputMasks
from backend.data.execution import ExecutionStatus
from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.json import validate_with_jsonschema
from backend.util.retry import func_retry
@@ -31,7 +33,7 @@ class AgentExecutorBlock(Block):
input_schema: dict = SchemaField(description="Input schema for the graph")
output_schema: dict = SchemaField(description="Output schema for the graph")
nodes_input_masks: Optional[NodesInputMasks] = SchemaField(
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = SchemaField(
default=None, hidden=True
)

View File

@@ -1,154 +0,0 @@
from enum import Enum
from typing import Literal
from pydantic import SecretStr
from replicate.client import Client as ReplicateClient
from replicate.helpers import FileOutput
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
SchemaField,
)
from backend.integrations.providers import ProviderName
from backend.util.file import MediaFileType
class GeminiImageModel(str, Enum):
NANO_BANANA = "google/nano-banana"
class OutputFormat(str, Enum):
JPG = "jpg"
PNG = "png"
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
provider="replicate",
api_key=SecretStr("mock-replicate-api-key"),
title="Mock Replicate API key",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}
class AIImageCustomizerBlock(Block):
class Input(BlockSchema):
credentials: CredentialsMetaInput[
Literal[ProviderName.REPLICATE], Literal["api_key"]
] = CredentialsField(
description="Replicate API key with permissions for Google Gemini image models",
)
prompt: str = SchemaField(
description="A text description of the image you want to generate",
title="Prompt",
)
model: GeminiImageModel = SchemaField(
description="The AI model to use for image generation and editing",
default=GeminiImageModel.NANO_BANANA,
title="Model",
)
images: list[MediaFileType] = SchemaField(
description="Optional list of input images to reference or modify",
default=[],
title="Input Images",
)
output_format: OutputFormat = SchemaField(
description="Format of the output image",
default=OutputFormat.PNG,
title="Output Format",
)
class Output(BlockSchema):
image_url: MediaFileType = SchemaField(description="URL of the generated image")
error: str = SchemaField(description="Error message if generation failed")
def __init__(self):
super().__init__(
id="d76bbe4c-930e-4894-8469-b66775511f71",
description=(
"Generate and edit custom images using Google's Nano-Banana model from Gemini 2.5. "
"Provide a prompt and optional reference images to create or modify images."
),
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=AIImageCustomizerBlock.Input,
output_schema=AIImageCustomizerBlock.Output,
test_input={
"prompt": "Make the scene more vibrant and colorful",
"model": GeminiImageModel.NANO_BANANA,
"images": [],
"output_format": OutputFormat.JPG,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("image_url", "https://replicate.delivery/generated-image.jpg"),
],
test_mock={
"run_model": lambda *args, **kwargs: MediaFileType(
"https://replicate.delivery/generated-image.jpg"
),
},
test_credentials=TEST_CREDENTIALS,
)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
try:
result = await self.run_model(
api_key=credentials.api_key,
model_name=input_data.model.value,
prompt=input_data.prompt,
images=input_data.images,
output_format=input_data.output_format.value,
)
yield "image_url", result
except Exception as e:
yield "error", str(e)
async def run_model(
self,
api_key: SecretStr,
model_name: str,
prompt: str,
images: list[MediaFileType],
output_format: str,
) -> MediaFileType:
client = ReplicateClient(api_token=api_key.get_secret_value())
input_params: dict = {
"prompt": prompt,
"output_format": output_format,
}
# Add images to input if provided (API expects "image_input" parameter)
if images:
input_params["image_input"] = [str(img) for img in images]
output: FileOutput | str = await client.async_run( # type: ignore
model_name,
input=input_params,
wait=False,
)
if isinstance(output, FileOutput):
return MediaFileType(output.url)
if isinstance(output, str):
return MediaFileType(output)
raise ValueError("No output received from the model")

View File

@@ -1,3 +0,0 @@
from .text_overlay import BannerbearTextOverlayBlock
__all__ = ["BannerbearTextOverlayBlock"]

View File

@@ -1,8 +0,0 @@
from backend.sdk import BlockCostType, ProviderBuilder
bannerbear = (
ProviderBuilder("bannerbear")
.with_api_key("BANNERBEAR_API_KEY", "Bannerbear API Key")
.with_base_cost(1, BlockCostType.RUN)
.build()
)

View File

@@ -1,239 +0,0 @@
import uuid
from typing import TYPE_CHECKING, Any, Dict, List
if TYPE_CHECKING:
pass
from pydantic import SecretStr
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import bannerbear
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
provider="bannerbear",
api_key=SecretStr("mock-bannerbear-api-key"),
title="Mock Bannerbear API Key",
)
class TextModification(BlockSchema):
name: str = SchemaField(
description="The name of the layer to modify in the template"
)
text: str = SchemaField(description="The text content to add to this layer")
color: str = SchemaField(
description="Hex color code for the text (e.g., '#FF0000')",
default="",
advanced=True,
)
font_family: str = SchemaField(
description="Font family to use for the text",
default="",
advanced=True,
)
font_size: int = SchemaField(
description="Font size in pixels",
default=0,
advanced=True,
)
font_weight: str = SchemaField(
description="Font weight (e.g., 'bold', 'normal')",
default="",
advanced=True,
)
text_align: str = SchemaField(
description="Text alignment (left, center, right)",
default="",
advanced=True,
)
class BannerbearTextOverlayBlock(Block):
class Input(BlockSchema):
credentials: CredentialsMetaInput = bannerbear.credentials_field(
description="API credentials for Bannerbear"
)
template_id: str = SchemaField(
description="The unique ID of your Bannerbear template"
)
project_id: str = SchemaField(
description="Optional: Project ID (required when using Master API Key)",
default="",
advanced=True,
)
text_modifications: List[TextModification] = SchemaField(
description="List of text layers to modify in the template"
)
image_url: str = SchemaField(
description="Optional: URL of an image to use in the template",
default="",
advanced=True,
)
image_layer_name: str = SchemaField(
description="Optional: Name of the image layer in the template",
default="photo",
advanced=True,
)
webhook_url: str = SchemaField(
description="Optional: URL to receive webhook notification when image is ready",
default="",
advanced=True,
)
metadata: str = SchemaField(
description="Optional: Custom metadata to attach to the image",
default="",
advanced=True,
)
class Output(BlockSchema):
success: bool = SchemaField(
description="Whether the image generation was successfully initiated"
)
image_url: str = SchemaField(
description="URL of the generated image (if synchronous) or placeholder"
)
uid: str = SchemaField(description="Unique identifier for the generated image")
status: str = SchemaField(description="Status of the image generation")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="c7d3a5c2-05fc-450e-8dce-3b0e04626009",
description="Add text overlay to images using Bannerbear templates. Perfect for creating social media graphics, marketing materials, and dynamic image content.",
categories={BlockCategory.PRODUCTIVITY, BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"template_id": "jJWBKNELpQPvbX5R93Gk",
"text_modifications": [
{
"name": "headline",
"text": "Amazing Product Launch!",
"color": "#FF0000",
},
{
"name": "subtitle",
"text": "50% OFF Today Only",
},
],
"credentials": {
"provider": "bannerbear",
"id": str(uuid.uuid4()),
"type": "api_key",
},
},
test_output=[
("success", True),
("image_url", "https://cdn.bannerbear.com/test-image.jpg"),
("uid", "test-uid-123"),
("status", "completed"),
],
test_mock={
"_make_api_request": lambda *args, **kwargs: {
"uid": "test-uid-123",
"status": "completed",
"image_url": "https://cdn.bannerbear.com/test-image.jpg",
}
},
test_credentials=TEST_CREDENTIALS,
)
async def _make_api_request(self, payload: dict, api_key: str) -> dict:
"""Make the actual API request to Bannerbear. This is separated for easy mocking in tests."""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
response = await Requests().post(
"https://sync.api.bannerbear.com/v2/images",
headers=headers,
json=payload,
)
if response.status in [200, 201, 202]:
return response.json()
else:
error_msg = f"API request failed with status {response.status}"
if response.text:
try:
error_data = response.json()
error_msg = (
f"{error_msg}: {error_data.get('message', response.text)}"
)
except Exception:
error_msg = f"{error_msg}: {response.text}"
raise Exception(error_msg)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
# Build the modifications array
modifications = []
# Add text modifications
for text_mod in input_data.text_modifications:
mod_data: Dict[str, Any] = {
"name": text_mod.name,
"text": text_mod.text,
}
# Add optional text styling parameters only if they have values
if text_mod.color and text_mod.color.strip():
mod_data["color"] = text_mod.color
if text_mod.font_family and text_mod.font_family.strip():
mod_data["font_family"] = text_mod.font_family
if text_mod.font_size and text_mod.font_size > 0:
mod_data["font_size"] = text_mod.font_size
if text_mod.font_weight and text_mod.font_weight.strip():
mod_data["font_weight"] = text_mod.font_weight
if text_mod.text_align and text_mod.text_align.strip():
mod_data["text_align"] = text_mod.text_align
modifications.append(mod_data)
# Add image modification if provided and not empty
if input_data.image_url and input_data.image_url.strip():
modifications.append(
{
"name": input_data.image_layer_name,
"image_url": input_data.image_url,
}
)
# Build the request payload - only include non-empty optional fields
payload = {
"template": input_data.template_id,
"modifications": modifications,
}
# Add project_id if provided (required for Master API keys)
if input_data.project_id and input_data.project_id.strip():
payload["project_id"] = input_data.project_id
if input_data.webhook_url and input_data.webhook_url.strip():
payload["webhook_url"] = input_data.webhook_url
if input_data.metadata and input_data.metadata.strip():
payload["metadata"] = input_data.metadata
# Make the API request using the private method
data = await self._make_api_request(
payload, credentials.api_key.get_secret_value()
)
# Synchronous request - image should be ready
yield "success", True
yield "image_url", data.get("image_url", "")
yield "uid", data.get("uid", "")
yield "status", data.get("status", "completed")

View File

@@ -553,12 +553,11 @@ class GithubListPRReviewersBlock(Block):
def prepare_pr_api_url(pr_url: str, path: str) -> str:
pattern = r"^(?:https?://)?github\.com/([^/]+/[^/]+)/pull/(\d+)"
# Pattern to capture the base repository URL and the pull request number
pattern = r"^(?:https?://)?([^/]+/[^/]+/[^/]+)/pull/(\d+)"
match = re.match(pattern, pr_url)
if not match:
raise ValueError(
f"Invalid GitHub PR URL: {pr_url}. URL must be a valid pull request URL, e.g., https://github.com/owner/repo/pull/123"
)
return pr_url
repo_path, pr_number = match.groups()
return f"{repo_path}/pulls/{pr_number}/{path}"
base_url, pr_number = match.groups()
return f"{base_url}/pulls/{pr_number}/{path}"

View File

@@ -35,19 +35,20 @@ async def execute_graph(
logger.info("Input data: %s", input_data)
# --- Test adding new executions --- #
graph_exec = await agent_server.test_execute_graph(
response = await agent_server.test_execute_graph(
user_id=test_user.id,
graph_id=test_graph.id,
graph_version=test_graph.version,
node_input=input_data,
)
logger.info("Created execution with ID: %s", graph_exec.id)
graph_exec_id = response.graph_exec_id
logger.info("Created execution with ID: %s", graph_exec_id)
# Execution queue should be empty
logger.info("Waiting for execution to complete...")
result = await wait_execution(test_user.id, graph_exec.id, 30)
result = await wait_execution(test_user.id, graph_exec_id, 30)
logger.info("Execution completed with %d results", len(result))
return graph_exec.id
return graph_exec_id
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -1,8 +1,5 @@
from backend.server.v2.library.model import LibraryAgentPreset
from .graph import NodeModel
from .integrations import Webhook # noqa: F401
# Resolve Webhook forward references
# Resolve Webhook <- NodeModel forward reference
NodeModel.model_rebuild()
LibraryAgentPreset.model_rebuild()

View File

@@ -8,7 +8,6 @@ from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Generic,
Optional,
@@ -45,10 +44,9 @@ if TYPE_CHECKING:
app_config = Config()
BlockData = tuple[str, Any] # Input & Output data should be a tuple of (name, data).
BlockInput = dict[str, Any] # Input: 1 input pin consumes 1 data.
BlockOutputEntry = tuple[str, Any] # Output data should be a tuple of (name, value).
BlockOutput = AsyncGen[BlockOutputEntry, None] # Output: 1 output pin produces n data.
BlockTestOutput = BlockOutputEntry | tuple[str, Callable[[Any], bool]]
BlockOutput = AsyncGen[BlockData, None] # Output: 1 output pin produces n data.
CompletedBlockOutput = dict[str, list[Any]] # Completed stream, collected as a dict.
@@ -308,7 +306,7 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
input_schema: Type[BlockSchemaInputType] = EmptySchema,
output_schema: Type[BlockSchemaOutputType] = EmptySchema,
test_input: BlockInput | list[BlockInput] | None = None,
test_output: BlockTestOutput | list[BlockTestOutput] | None = None,
test_output: BlockData | list[BlockData] | None = None,
test_mock: dict[str, Any] | None = None,
test_credentials: Optional[Credentials | dict[str, Credentials]] = None,
disabled: bool = False,

View File

@@ -11,14 +11,11 @@ from typing import (
Generator,
Generic,
Literal,
Mapping,
Optional,
TypeVar,
cast,
overload,
)
from prisma import Json
from prisma.enums import AgentExecutionStatus
from prisma.models import (
AgentGraphExecution,
@@ -27,6 +24,7 @@ from prisma.models import (
AgentNodeExecutionKeyValueData,
)
from prisma.types import (
AgentGraphExecutionCreateInput,
AgentGraphExecutionUpdateManyMutationInput,
AgentGraphExecutionWhereInput,
AgentNodeExecutionCreateInput,
@@ -62,7 +60,7 @@ from .includes import (
GRAPH_EXECUTION_INCLUDE_WITH_NODES,
graph_execution_include,
)
from .model import CredentialsMetaInput, GraphExecutionStats, NodeExecutionStats
from .model import GraphExecutionStats, NodeExecutionStats
T = TypeVar("T")
@@ -89,8 +87,6 @@ class BlockErrorStats(BaseModel):
ExecutionStatus = AgentExecutionStatus
NodeInputMask = Mapping[str, JsonValue]
NodesInputMasks = Mapping[str, NodeInputMask]
class GraphExecutionMeta(BaseDbModel):
@@ -98,10 +94,7 @@ class GraphExecutionMeta(BaseDbModel):
user_id: str
graph_id: str
graph_version: int
inputs: Optional[BlockInput] # no default -> required in the OpenAPI spec
credential_inputs: Optional[dict[str, CredentialsMetaInput]]
nodes_input_masks: Optional[dict[str, BlockInput]]
preset_id: Optional[str]
preset_id: Optional[str] = None
status: ExecutionStatus
started_at: datetime
ended_at: datetime
@@ -186,18 +179,6 @@ class GraphExecutionMeta(BaseDbModel):
user_id=_graph_exec.userId,
graph_id=_graph_exec.agentGraphId,
graph_version=_graph_exec.agentGraphVersion,
inputs=cast(BlockInput | None, _graph_exec.inputs),
credential_inputs=(
{
name: CredentialsMetaInput.model_validate(cmi)
for name, cmi in cast(dict, _graph_exec.credentialInputs).items()
}
if _graph_exec.credentialInputs
else None
),
nodes_input_masks=cast(
dict[str, BlockInput] | None, _graph_exec.nodesInputMasks
),
preset_id=_graph_exec.agentPresetId,
status=ExecutionStatus(_graph_exec.executionStatus),
started_at=start_time,
@@ -225,7 +206,7 @@ class GraphExecutionMeta(BaseDbModel):
class GraphExecution(GraphExecutionMeta):
inputs: BlockInput # type: ignore - incompatible override is intentional
inputs: BlockInput
outputs: CompletedBlockOutput
@staticmethod
@@ -245,18 +226,15 @@ class GraphExecution(GraphExecutionMeta):
)
inputs = {
**(
graph_exec.inputs
or {
# fallback: extract inputs from Agent Input Blocks
exec.input_data["name"]: exec.input_data.get("value")
for exec in complete_node_executions
if (
(block := get_block(exec.block_id))
and block.block_type == BlockType.INPUT
)
}
),
**{
# inputs from Agent Input Blocks
exec.input_data["name"]: exec.input_data.get("value")
for exec in complete_node_executions
if (
(block := get_block(exec.block_id))
and block.block_type == BlockType.INPUT
)
},
**{
# input from webhook-triggered block
"payload": exec.input_data["payload"]
@@ -274,13 +252,14 @@ class GraphExecution(GraphExecutionMeta):
if (
block := get_block(exec.block_id)
) and block.block_type == BlockType.OUTPUT:
outputs[exec.input_data["name"]].append(exec.input_data.get("value"))
outputs[exec.input_data["name"]].append(
exec.input_data.get("value", None)
)
return GraphExecution(
**{
field_name: getattr(graph_exec, field_name)
for field_name in GraphExecutionMeta.model_fields
if field_name != "inputs"
},
inputs=inputs,
outputs=outputs,
@@ -313,17 +292,13 @@ class GraphExecutionWithNodes(GraphExecution):
node_executions=node_executions,
)
def to_graph_execution_entry(
self,
user_context: "UserContext",
compiled_nodes_input_masks: Optional[NodesInputMasks] = None,
):
def to_graph_execution_entry(self, user_context: "UserContext"):
return GraphExecutionEntry(
user_id=self.user_id,
graph_id=self.graph_id,
graph_version=self.graph_version or 0,
graph_exec_id=self.id,
nodes_input_masks=compiled_nodes_input_masks,
nodes_input_masks={}, # FIXME: store credentials on AgentGraphExecution
user_context=user_context,
)
@@ -340,9 +315,10 @@ class NodeExecutionResult(BaseModel):
input_data: BlockInput
output_data: CompletedBlockOutput
add_time: datetime
queue_time: datetime | None
start_time: datetime | None
end_time: datetime | None
queue_time: datetime | None = None
start_time: datetime | None = None
end_time: datetime | None = None
stats: NodeExecutionStats | None = None
@staticmethod
def from_db(_node_exec: AgentNodeExecution, user_id: Optional[str] = None):
@@ -360,7 +336,7 @@ class NodeExecutionResult(BaseModel):
else:
input_data: BlockInput = defaultdict()
for data in _node_exec.Input or []:
input_data[data.name] = type_utils.convert(data.data, JsonValue)
input_data[data.name] = type_utils.convert(data.data, type[Any])
output_data: CompletedBlockOutput = defaultdict(list)
@@ -369,7 +345,7 @@ class NodeExecutionResult(BaseModel):
output_data[name].extend(messages)
else:
for data in _node_exec.Output or []:
output_data[data.name].append(type_utils.convert(data.data, JsonValue))
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution
if graph_execution:
@@ -394,6 +370,7 @@ class NodeExecutionResult(BaseModel):
queue_time=_node_exec.queuedTime,
start_time=_node_exec.startedTime,
end_time=_node_exec.endedTime,
stats=stats,
)
def to_node_execution_entry(
@@ -564,12 +541,9 @@ async def get_graph_execution(
async def create_graph_execution(
graph_id: str,
graph_version: int,
starting_nodes_input: list[tuple[str, BlockInput]], # list[(node_id, BlockInput)]
inputs: Mapping[str, JsonValue],
starting_nodes_input: list[tuple[str, BlockInput]],
user_id: str,
preset_id: Optional[str] = None,
credential_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
preset_id: str | None = None,
) -> GraphExecutionWithNodes:
"""
Create a new AgentGraphExecution record.
@@ -577,18 +551,11 @@ async def create_graph_execution(
The id of the AgentGraphExecution and the list of ExecutionResult for each node.
"""
result = await AgentGraphExecution.prisma().create(
data={
"agentGraphId": graph_id,
"agentGraphVersion": graph_version,
"executionStatus": ExecutionStatus.QUEUED,
"inputs": SafeJson(inputs),
"credentialInputs": (
SafeJson(credential_inputs) if credential_inputs else Json({})
),
"nodesInputMasks": (
SafeJson(nodes_input_masks) if nodes_input_masks else Json({})
),
"NodeExecutions": {
data=AgentGraphExecutionCreateInput(
agentGraphId=graph_id,
agentGraphVersion=graph_version,
executionStatus=ExecutionStatus.QUEUED,
NodeExecutions={
"create": [
AgentNodeExecutionCreateInput(
agentNodeId=node_id,
@@ -604,9 +571,9 @@ async def create_graph_execution(
for node_id, node_input in starting_nodes_input
]
},
"userId": user_id,
"agentPresetId": preset_id,
},
userId=user_id,
agentPresetId=preset_id,
),
include=GRAPH_EXECUTION_INCLUDE_WITH_NODES,
)
@@ -617,7 +584,7 @@ async def upsert_execution_input(
node_id: str,
graph_exec_id: str,
input_name: str,
input_data: JsonValue,
input_data: Any,
node_exec_id: str | None = None,
) -> tuple[str, BlockInput]:
"""
@@ -666,7 +633,7 @@ async def upsert_execution_input(
)
return existing_execution.id, {
**{
input_data.name: type_utils.convert(input_data.data, JsonValue)
input_data.name: type_utils.convert(input_data.data, type[Any])
for input_data in existing_execution.Input or []
},
input_name: input_data,
@@ -689,6 +656,42 @@ async def upsert_execution_input(
)
async def create_node_execution(
node_exec_id: str,
node_id: str,
graph_exec_id: str,
input_name: str,
input_data: Any,
) -> None:
"""Create a new node execution with the first input."""
json_input_data = SafeJson(input_data)
await AgentNodeExecution.prisma().create(
data=AgentNodeExecutionCreateInput(
id=node_exec_id,
agentNodeId=node_id,
agentGraphExecutionId=graph_exec_id,
executionStatus=ExecutionStatus.INCOMPLETE,
Input={"create": {"name": input_name, "data": json_input_data}},
)
)
async def add_input_to_node_execution(
node_exec_id: str,
input_name: str,
input_data: Any,
) -> None:
"""Add an input to an existing node execution."""
json_input_data = SafeJson(input_data)
await AgentNodeExecutionInputOutput.prisma().create(
data=AgentNodeExecutionInputOutputCreateInput(
name=input_name,
data=json_input_data,
referencedByInputExecId=node_exec_id,
)
)
async def upsert_execution_output(
node_exec_id: str,
output_name: str,
@@ -923,7 +926,7 @@ class GraphExecutionEntry(BaseModel):
graph_exec_id: str
graph_id: str
graph_version: int
nodes_input_masks: Optional[NodesInputMasks] = None
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None
user_context: UserContext

View File

@@ -12,7 +12,7 @@ from prisma.types import (
AgentNodeLinkCreateInput,
StoreListingVersionWhereInput,
)
from pydantic import BaseModel, Field, create_model
from pydantic import Field, JsonValue, create_model
from pydantic.fields import computed_field
from backend.blocks.agent import AgentExecutorBlock
@@ -34,7 +34,6 @@ from .db import BaseDbModel, query_raw_with_schema, transaction
from .includes import AGENT_GRAPH_INCLUDE, AGENT_NODE_INCLUDE
if TYPE_CHECKING:
from .execution import NodesInputMasks
from .integrations import Webhook
logger = logging.getLogger(__name__)
@@ -206,35 +205,6 @@ class BaseGraph(BaseDbModel):
None,
)
@computed_field
@property
def trigger_setup_info(self) -> "GraphTriggerInfo | None":
if not (
self.webhook_input_node
and (trigger_block := self.webhook_input_node.block).webhook_config
):
return None
return GraphTriggerInfo(
provider=trigger_block.webhook_config.provider,
config_schema={
**(json_schema := trigger_block.input_schema.jsonschema()),
"properties": {
pn: sub_schema
for pn, sub_schema in json_schema["properties"].items()
if not is_credentials_field_name(pn)
},
"required": [
pn
for pn in json_schema.get("required", [])
if not is_credentials_field_name(pn)
],
},
credentials_input_name=next(
iter(trigger_block.input_schema.get_credentials_fields()), None
),
)
@staticmethod
def _generate_schema(
*props: tuple[type[AgentInputBlock.Input] | type[AgentOutputBlock.Input], dict],
@@ -268,14 +238,6 @@ class BaseGraph(BaseDbModel):
}
class GraphTriggerInfo(BaseModel):
provider: ProviderName
config_schema: dict[str, Any] = Field(
description="Input schema for the trigger block"
)
credentials_input_name: Optional[str]
class Graph(BaseGraph):
sub_graphs: list[BaseGraph] = [] # Flattened sub-graphs
@@ -452,7 +414,7 @@ class GraphModel(Graph):
def validate_graph(
self,
for_run: bool = False,
nodes_input_masks: Optional["NodesInputMasks"] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
):
"""
Validate graph structure and raise `ValueError` on issues.
@@ -466,7 +428,7 @@ class GraphModel(Graph):
def _validate_graph(
graph: BaseGraph,
for_run: bool = False,
nodes_input_masks: Optional["NodesInputMasks"] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> None:
errors = GraphModel._validate_graph_get_errors(
graph, for_run, nodes_input_masks
@@ -480,7 +442,7 @@ class GraphModel(Graph):
def validate_graph_get_errors(
self,
for_run: bool = False,
nodes_input_masks: Optional["NodesInputMasks"] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> dict[str, dict[str, str]]:
"""
Validate graph and return structured errors per node.
@@ -502,7 +464,7 @@ class GraphModel(Graph):
def _validate_graph_get_errors(
graph: BaseGraph,
for_run: bool = False,
nodes_input_masks: Optional["NodesInputMasks"] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> dict[str, dict[str, str]]:
"""
Validate graph and return structured errors per node.

View File

@@ -59,15 +59,9 @@ def graph_execution_include(
}
AGENT_PRESET_INCLUDE: prisma.types.AgentPresetInclude = {
"InputPresets": True,
"Webhook": True,
}
INTEGRATION_WEBHOOK_INCLUDE: prisma.types.IntegrationWebhookInclude = {
"AgentNodes": {"include": AGENT_NODE_INCLUDE},
"AgentPresets": {"include": AGENT_PRESET_INCLUDE},
"AgentPresets": {"include": {"InputPresets": True}},
}

View File

@@ -115,7 +115,7 @@ async def generate_activity_status_for_execution(
# Get all node executions for this graph execution
node_executions = await db_client.get_node_executions(
graph_exec_id, include_exec_data=True
graph_exec_id=graph_exec_id, include_exec_data=True
)
# Get graph metadata and full graph structure for name, description, and links

View File

@@ -4,12 +4,13 @@ from typing import Callable, Concatenate, ParamSpec, TypeVar, cast
from backend.data import db
from backend.data.credit import UsageTransactionMetadata, get_user_credit_model
from backend.data.execution import (
add_input_to_node_execution,
create_graph_execution,
create_node_execution,
get_block_error_stats,
get_execution_kv_data,
get_graph_execution_meta,
get_graph_executions,
get_latest_node_execution,
get_node_execution,
get_node_executions,
set_execution_kv_data,
@@ -17,7 +18,6 @@ from backend.data.execution import (
update_graph_execution_stats,
update_node_execution_status,
update_node_execution_status_batch,
upsert_execution_input,
upsert_execution_output,
)
from backend.data.generate_data import get_user_execution_summary_data
@@ -105,13 +105,13 @@ class DatabaseManager(AppService):
create_graph_execution = _(create_graph_execution)
get_node_execution = _(get_node_execution)
get_node_executions = _(get_node_executions)
get_latest_node_execution = _(get_latest_node_execution)
update_node_execution_status = _(update_node_execution_status)
update_node_execution_status_batch = _(update_node_execution_status_batch)
update_graph_execution_start_time = _(update_graph_execution_start_time)
update_graph_execution_stats = _(update_graph_execution_stats)
upsert_execution_input = _(upsert_execution_input)
upsert_execution_output = _(upsert_execution_output)
create_node_execution = _(create_node_execution)
add_input_to_node_execution = _(add_input_to_node_execution)
get_execution_kv_data = _(get_execution_kv_data)
set_execution_kv_data = _(set_execution_kv_data)
get_block_error_stats = _(get_block_error_stats)
@@ -171,10 +171,12 @@ class DatabaseManagerClient(AppServiceClient):
get_graph_executions = _(d.get_graph_executions)
get_graph_execution_meta = _(d.get_graph_execution_meta)
get_node_executions = _(d.get_node_executions)
create_node_execution = _(d.create_node_execution)
update_node_execution_status = _(d.update_node_execution_status)
update_graph_execution_start_time = _(d.update_graph_execution_start_time)
update_graph_execution_stats = _(d.update_graph_execution_stats)
upsert_execution_output = _(d.upsert_execution_output)
add_input_to_node_execution = _(d.add_input_to_node_execution)
# Graphs
get_graph_metadata = _(d.get_graph_metadata)
@@ -189,14 +191,6 @@ class DatabaseManagerClient(AppServiceClient):
# User Emails
get_user_email_by_id = _(d.get_user_email_by_id)
# Library
list_library_agents = _(d.list_library_agents)
add_store_agent_to_library = _(d.add_store_agent_to_library)
# Store
get_store_agents = _(d.get_store_agents)
get_store_agent_details = _(d.get_store_agent_details)
class DatabaseManagerAsyncClient(AppServiceClient):
d = DatabaseManager
@@ -207,16 +201,12 @@ class DatabaseManagerAsyncClient(AppServiceClient):
create_graph_execution = d.create_graph_execution
get_connected_output_nodes = d.get_connected_output_nodes
get_latest_node_execution = d.get_latest_node_execution
get_graph = d.get_graph
get_graph_metadata = d.get_graph_metadata
get_graph_execution_meta = d.get_graph_execution_meta
get_node = d.get_node
get_node_execution = d.get_node_execution
get_node_executions = d.get_node_executions
get_user_integrations = d.get_user_integrations
upsert_execution_input = d.upsert_execution_input
upsert_execution_output = d.upsert_execution_output
update_graph_execution_stats = d.update_graph_execution_stats
update_node_execution_status = d.update_node_execution_status
update_node_execution_status_batch = d.update_node_execution_status_batch

View File

@@ -0,0 +1,154 @@
import logging
import threading
from collections import OrderedDict
from functools import wraps
from typing import TYPE_CHECKING, Any, Optional
from backend.data.execution import ExecutionStatus, NodeExecutionResult
from backend.data.model import GraphExecutionStats, NodeExecutionStats
if TYPE_CHECKING:
from backend.executor import DatabaseManagerClient
logger = logging.getLogger(__name__)
def with_lock(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
with self._lock:
return func(self, *args, **kwargs)
return wrapper
class ExecutionCache:
def __init__(self, graph_exec_id: str, db_client: "DatabaseManagerClient"):
self._lock = threading.RLock()
self._graph_exec_id = graph_exec_id
self._graph_stats: GraphExecutionStats = GraphExecutionStats()
self._node_executions: OrderedDict[str, NodeExecutionResult] = OrderedDict()
for execution in db_client.get_node_executions(self._graph_exec_id):
self._node_executions[execution.node_exec_id] = execution
@with_lock
def get_node_execution(self, node_exec_id: str) -> Optional[NodeExecutionResult]:
execution = self._node_executions.get(node_exec_id)
return execution.model_copy(deep=True) if execution else None
@with_lock
def get_latest_node_execution(self, node_id: str) -> Optional[NodeExecutionResult]:
for execution in reversed(self._node_executions.values()):
if (
execution.node_id == node_id
and execution.status != ExecutionStatus.INCOMPLETE
):
return execution.model_copy(deep=True)
return None
@with_lock
def get_node_executions(
self,
*,
statuses: Optional[list] = None,
block_ids: Optional[list] = None,
node_id: Optional[str] = None,
):
results = []
for execution in self._node_executions.values():
if statuses and execution.status not in statuses:
continue
if block_ids and execution.block_id not in block_ids:
continue
if node_id and execution.node_id != node_id:
continue
results.append(execution.model_copy(deep=True))
return results
@with_lock
def update_node_execution_status(
self,
exec_id: str,
status: ExecutionStatus,
execution_data: Optional[dict] = None,
stats: Optional[dict] = None,
):
if exec_id not in self._node_executions:
raise RuntimeError(f"Execution {exec_id} not found in cache")
execution = self._node_executions[exec_id]
execution.status = status
if execution_data:
execution.input_data.update(execution_data)
if stats:
execution.stats = execution.stats or NodeExecutionStats()
current_stats = execution.stats.model_dump()
current_stats.update(stats)
execution.stats = NodeExecutionStats.model_validate(current_stats)
@with_lock
def upsert_execution_output(
self, node_exec_id: str, output_name: str, output_data: Any
) -> NodeExecutionResult:
if node_exec_id not in self._node_executions:
raise RuntimeError(f"Execution {node_exec_id} not found in cache")
execution = self._node_executions[node_exec_id]
if output_name not in execution.output_data:
execution.output_data[output_name] = []
execution.output_data[output_name].append(output_data)
return execution
@with_lock
def update_graph_stats(
self, status: Optional[ExecutionStatus] = None, stats: Optional[dict] = None
):
if status is not None:
pass
if stats is not None:
current_stats = self._graph_stats.model_dump()
current_stats.update(stats)
self._graph_stats = GraphExecutionStats.model_validate(current_stats)
@with_lock
def update_graph_start_time(self):
"""Update graph start time (handled by database persistence)."""
pass
@with_lock
def find_incomplete_execution_for_input(
self, node_id: str, input_name: str
) -> tuple[str, NodeExecutionResult] | None:
for exec_id, execution in self._node_executions.items():
if (
execution.node_id == node_id
and execution.status == ExecutionStatus.INCOMPLETE
and input_name not in execution.input_data
):
return exec_id, execution
return None
@with_lock
def add_node_execution(
self, node_exec_id: str, execution: NodeExecutionResult
) -> None:
self._node_executions[node_exec_id] = execution
@with_lock
def update_execution_input(
self, exec_id: str, input_name: str, input_data: Any
) -> dict:
if exec_id not in self._node_executions:
raise RuntimeError(f"Execution {exec_id} not found in cache")
execution = self._node_executions[exec_id]
execution.input_data[input_name] = input_data
return execution.input_data.copy()
def finalize(self) -> None:
with self._lock:
self._node_executions.clear()
self._graph_stats = GraphExecutionStats()

View File

@@ -0,0 +1,355 @@
"""Test execution creation with proper ID generation and persistence."""
import asyncio
import threading
import uuid
from datetime import datetime
import pytest
from backend.data.execution import ExecutionStatus
from backend.executor.execution_data import ExecutionDataClient
@pytest.fixture
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
async def execution_client_with_mock_db(event_loop):
"""Create an ExecutionDataClient with proper database records."""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from prisma.models import AgentGraph, AgentGraphExecution, User
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
# Create test database records to satisfy foreign key constraints
try:
await User.prisma().create(
data={
"id": "test_user_123",
"email": "test@example.com",
"name": "Test User",
}
)
await AgentGraph.prisma().create(
data={
"id": "test_graph_456",
"version": 1,
"userId": "test_user_123",
"name": "Test Graph",
"description": "Test graph for execution tests",
}
)
from prisma.enums import AgentExecutionStatus
await AgentGraphExecution.prisma().create(
data={
"id": "test_graph_exec_id",
"userId": "test_user_123",
"agentGraphId": "test_graph_456",
"agentGraphVersion": 1,
"executionStatus": AgentExecutionStatus.RUNNING,
}
)
except Exception:
# Records might already exist, that's fine
pass
# Mock the graph execution metadata - align with assertions below
mock_graph_meta = GraphExecutionMeta(
id="test_graph_exec_id",
user_id="test_user_123",
graph_id="test_graph_456",
graph_version=1,
status=ExecutionStatus.RUNNING,
started_at=datetime.now(timezone.utc),
ended_at=datetime.now(timezone.utc),
stats=None,
)
# Create client with ThreadPoolExecutor and graph metadata (constructed inside patch)
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=1)
# Storage for tracking created executions
created_executions = []
async def mock_create_node_execution(
node_exec_id, node_id, graph_exec_id, input_name, input_data
):
"""Mock execution creation that records what was created."""
created_executions.append(
{
"node_exec_id": node_exec_id,
"node_id": node_id,
"graph_exec_id": graph_exec_id,
"input_name": input_name,
"input_data": input_data,
}
)
return node_exec_id
def sync_mock_create_node_execution(
node_exec_id, node_id, graph_exec_id, input_name, input_data
):
"""Mock sync execution creation that records what was created."""
created_executions.append(
{
"node_exec_id": node_exec_id,
"node_id": node_id,
"graph_exec_id": graph_exec_id,
"input_name": input_name,
"input_data": input_data,
}
)
return node_exec_id
# Prepare mock async and sync DB clients
async_mock_client = AsyncMock()
async_mock_client.create_node_execution = mock_create_node_execution
sync_mock_client = MagicMock()
sync_mock_client.create_node_execution = sync_mock_create_node_execution
# Mock graph execution for return values
from backend.data.execution import GraphExecutionMeta
mock_graph_update = GraphExecutionMeta(
id="test_graph_exec_id",
user_id="test_user_123",
graph_id="test_graph_456",
graph_version=1,
status=ExecutionStatus.RUNNING,
started_at=datetime.now(timezone.utc),
ended_at=datetime.now(timezone.utc),
stats=None,
)
# No-ops for other sync methods used by the client during tests
sync_mock_client.add_input_to_node_execution.side_effect = lambda **kwargs: None
sync_mock_client.update_node_execution_status.side_effect = (
lambda *args, **kwargs: None
)
sync_mock_client.upsert_execution_output.side_effect = lambda **kwargs: None
sync_mock_client.update_graph_execution_stats.side_effect = (
lambda *args, **kwargs: mock_graph_update
)
sync_mock_client.update_graph_execution_start_time.side_effect = (
lambda *args, **kwargs: mock_graph_update
)
thread = threading.Thread(target=event_loop.run_forever, daemon=True)
thread.start()
with patch(
"backend.executor.execution_data.get_database_manager_async_client",
return_value=async_mock_client,
), patch(
"backend.executor.execution_data.get_database_manager_client",
return_value=sync_mock_client,
), patch(
"backend.executor.execution_data.get_execution_event_bus"
), patch(
"backend.executor.execution_data.non_blocking_persist", lambda func: func
):
# Now construct the client under the patch so it captures the mocked clients
client = ExecutionDataClient(executor, "test_graph_exec_id", mock_graph_meta)
# Store the mocks for the test to access if needed
setattr(client, "_test_async_client", async_mock_client)
setattr(client, "_test_sync_client", sync_mock_client)
setattr(client, "_created_executions", created_executions)
yield client
# Cleanup test database records
try:
await AgentGraphExecution.prisma().delete_many(
where={"id": "test_graph_exec_id"}
)
await AgentGraph.prisma().delete_many(where={"id": "test_graph_456"})
await User.prisma().delete_many(where={"id": "test_user_123"})
except Exception:
# Cleanup may fail if records don't exist
pass
# Cleanup
event_loop.call_soon_threadsafe(event_loop.stop)
thread.join(timeout=1)
class TestExecutionCreation:
"""Test execution creation with proper ID generation and persistence."""
async def test_execution_creation_with_valid_ids(
self, execution_client_with_mock_db
):
"""Test that execution creation generates and persists valid IDs."""
client = execution_client_with_mock_db
node_id = "test_node_789"
input_name = "test_input"
input_data = "test_value"
block_id = "test_block_abc"
# This should trigger execution creation since cache is empty
exec_id, input_dict = client.upsert_execution_input(
node_id=node_id,
input_name=input_name,
input_data=input_data,
block_id=block_id,
)
# Verify execution ID is valid UUID
try:
uuid.UUID(exec_id)
except ValueError:
pytest.fail(f"Generated execution ID '{exec_id}' is not a valid UUID")
# Verify execution was created in cache with complete data
assert exec_id in client._cache._node_executions
cached_execution = client._cache._node_executions[exec_id]
# Check all required fields have valid values
assert cached_execution.user_id == "test_user_123"
assert cached_execution.graph_id == "test_graph_456"
assert cached_execution.graph_version == 1
assert cached_execution.graph_exec_id == "test_graph_exec_id"
assert cached_execution.node_exec_id == exec_id
assert cached_execution.node_id == node_id
assert cached_execution.block_id == block_id
assert cached_execution.status == ExecutionStatus.INCOMPLETE
assert cached_execution.input_data == {input_name: input_data}
assert isinstance(cached_execution.add_time, datetime)
# Verify execution was persisted to database with our generated ID
created_executions = getattr(client, "_created_executions", [])
assert len(created_executions) == 1
created = created_executions[0]
assert created["node_exec_id"] == exec_id # Our generated ID was used
assert created["node_id"] == node_id
assert created["graph_exec_id"] == "test_graph_exec_id"
assert created["input_name"] == input_name
assert created["input_data"] == input_data
# Verify input dict returned correctly
assert input_dict == {input_name: input_data}
async def test_execution_reuse_vs_creation(self, execution_client_with_mock_db):
"""Test that execution reuse works and creation only happens when needed."""
client = execution_client_with_mock_db
node_id = "reuse_test_node"
block_id = "reuse_test_block"
# Create first execution
exec_id_1, input_dict_1 = client.upsert_execution_input(
node_id=node_id,
input_name="input_1",
input_data="value_1",
block_id=block_id,
)
# This should reuse the existing INCOMPLETE execution
exec_id_2, input_dict_2 = client.upsert_execution_input(
node_id=node_id,
input_name="input_2",
input_data="value_2",
block_id=block_id,
)
# Should reuse the same execution
assert exec_id_1 == exec_id_2
assert input_dict_2 == {"input_1": "value_1", "input_2": "value_2"}
# Only one execution should be created in database
created_executions = getattr(client, "_created_executions", [])
assert len(created_executions) == 1
# Verify cache has the merged inputs
cached_execution = client._cache._node_executions[exec_id_1]
assert cached_execution.input_data == {
"input_1": "value_1",
"input_2": "value_2",
}
# Now complete the execution and try to add another input
client.update_node_status_and_publish(
exec_id=exec_id_1, status=ExecutionStatus.COMPLETED
)
# Verify the execution status was actually updated in the cache
updated_execution = client._cache._node_executions[exec_id_1]
assert (
updated_execution.status == ExecutionStatus.COMPLETED
), f"Expected COMPLETED but got {updated_execution.status}"
# This should create a NEW execution since the first is no longer INCOMPLETE
exec_id_3, input_dict_3 = client.upsert_execution_input(
node_id=node_id,
input_name="input_3",
input_data="value_3",
block_id=block_id,
)
# Should be a different execution
assert exec_id_3 != exec_id_1
assert input_dict_3 == {"input_3": "value_3"}
# Verify cache behavior: should have two different executions in cache now
cached_executions = client._cache._node_executions
assert len(cached_executions) == 2
assert exec_id_1 in cached_executions
assert exec_id_3 in cached_executions
# First execution should be COMPLETED
assert cached_executions[exec_id_1].status == ExecutionStatus.COMPLETED
# Third execution should be INCOMPLETE (newly created)
assert cached_executions[exec_id_3].status == ExecutionStatus.INCOMPLETE
async def test_multiple_nodes_get_different_execution_ids(
self, execution_client_with_mock_db
):
"""Test that different nodes get different execution IDs."""
client = execution_client_with_mock_db
# Create executions for different nodes
exec_id_a, _ = client.upsert_execution_input(
node_id="node_a",
input_name="test_input",
input_data="test_value",
block_id="block_a",
)
exec_id_b, _ = client.upsert_execution_input(
node_id="node_b",
input_name="test_input",
input_data="test_value",
block_id="block_b",
)
# Should be different executions with different IDs
assert exec_id_a != exec_id_b
# Both should be valid UUIDs
uuid.UUID(exec_id_a)
uuid.UUID(exec_id_b)
# Both should be in cache
cached_executions = client._cache._node_executions
assert len(cached_executions) == 2
assert exec_id_a in cached_executions
assert exec_id_b in cached_executions
# Both should have correct node IDs
assert cached_executions[exec_id_a].node_id == "node_a"
assert cached_executions[exec_id_b].node_id == "node_b"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,338 @@
import logging
import threading
import uuid
from concurrent.futures import Executor
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Callable, ParamSpec, TypeVar, cast
from backend.data import redis_client as redis
from backend.data.credit import UsageTransactionMetadata
from backend.data.execution import (
ExecutionStatus,
GraphExecutionMeta,
NodeExecutionResult,
)
from backend.data.graph import Node
from backend.data.model import GraphExecutionStats
from backend.executor.execution_cache import ExecutionCache
from backend.util.clients import (
get_database_manager_async_client,
get_database_manager_client,
get_execution_event_bus,
)
from backend.util.settings import Settings
if TYPE_CHECKING:
from backend.executor import DatabaseManagerAsyncClient, DatabaseManagerClient
settings = Settings()
logger = logging.getLogger(__name__)
P = ParamSpec("P")
T = TypeVar("T")
def non_blocking_persist(func: Callable[P, T]) -> Callable[P, None]:
from functools import wraps
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
# First argument is always self for methods - access through cast for typing
self = cast("ExecutionDataClient", args[0])
future = self._executor.submit(func, *args, **kwargs)
self._pending_tasks.add(future)
return wrapper
class ExecutionDataClient:
def __init__(
self, executor: Executor, graph_exec_id: str, graph_metadata: GraphExecutionMeta
):
self._executor = executor
self._graph_exec_id = graph_exec_id
self._cache = ExecutionCache(graph_exec_id, self.db_client_sync)
self._pending_tasks = set()
self._graph_metadata = graph_metadata
self.graph_lock = threading.RLock()
def finalize_execution(self, timeout: float = 30.0):
logger.info(f"Flushing db writes for execution {self._graph_exec_id}")
exceptions = []
# Wait for all pending database operations to complete
logger.debug(
f"Waiting for {len(self._pending_tasks)} pending database operations"
)
for future in list(self._pending_tasks):
try:
future.result(timeout=timeout)
except Exception as e:
logger.error(f"Background database operation failed: {e}")
exceptions.append(e)
finally:
self._pending_tasks.discard(future)
self._cache.finalize()
if exceptions:
logger.error(f"Background persistence failed with {len(exceptions)} errors")
raise RuntimeError(
f"Background persistence failed with {len(exceptions)} errors: {exceptions}"
)
@property
def db_client_async(self) -> "DatabaseManagerAsyncClient":
return get_database_manager_async_client()
@property
def db_client_sync(self) -> "DatabaseManagerClient":
return get_database_manager_client()
@property
def event_bus(self):
return get_execution_event_bus()
async def get_node(self, node_id: str) -> Node:
return await self.db_client_async.get_node(node_id)
def spend_credits(
self,
user_id: str,
cost: int,
metadata: UsageTransactionMetadata,
) -> int:
return self.db_client_sync.spend_credits(
user_id=user_id, cost=cost, metadata=metadata
)
def get_graph_execution_meta(
self, user_id: str, execution_id: str
) -> GraphExecutionMeta | None:
return self.db_client_sync.get_graph_execution_meta(
user_id=user_id, execution_id=execution_id
)
def get_graph_metadata(
self, graph_id: str, graph_version: int | None = None
) -> Any:
return self.db_client_sync.get_graph_metadata(graph_id, graph_version)
def get_credits(self, user_id: str) -> int:
return self.db_client_sync.get_credits(user_id)
def get_user_email_by_id(self, user_id: str) -> str | None:
return self.db_client_sync.get_user_email_by_id(user_id)
def get_latest_node_execution(self, node_id: str) -> NodeExecutionResult | None:
return self._cache.get_latest_node_execution(node_id)
def get_node_execution(self, node_exec_id: str) -> NodeExecutionResult | None:
return self._cache.get_node_execution(node_exec_id)
def get_node_executions(
self,
*,
node_id: str | None = None,
statuses: list[ExecutionStatus] | None = None,
block_ids: list[str] | None = None,
) -> list[NodeExecutionResult]:
return self._cache.get_node_executions(
statuses=statuses, block_ids=block_ids, node_id=node_id
)
def update_node_status_and_publish(
self,
exec_id: str,
status: ExecutionStatus,
execution_data: dict | None = None,
stats: dict[str, Any] | None = None,
):
self._cache.update_node_execution_status(exec_id, status, execution_data, stats)
self._persist_node_status_to_db(exec_id, status, execution_data, stats)
def upsert_execution_input(
self, node_id: str, input_name: str, input_data: Any, block_id: str
) -> tuple[str, dict]:
# Validate input parameters to prevent foreign key constraint errors
if not node_id or not isinstance(node_id, str):
raise ValueError(f"Invalid node_id: {node_id}")
if not self._graph_exec_id or not isinstance(self._graph_exec_id, str):
raise ValueError(f"Invalid graph_exec_id: {self._graph_exec_id}")
if not block_id or not isinstance(block_id, str):
raise ValueError(f"Invalid block_id: {block_id}")
# UPDATE: Try to find an existing incomplete execution for this node and input
if result := self._cache.find_incomplete_execution_for_input(
node_id, input_name
):
exec_id, _ = result
updated_input_data = self._cache.update_execution_input(
exec_id, input_name, input_data
)
self._persist_add_input_to_db(exec_id, input_name, input_data)
return exec_id, updated_input_data
# CREATE: No suitable execution found, create new one
node_exec_id = str(uuid.uuid4())
logger.debug(
f"Creating new execution {node_exec_id} for node {node_id} "
f"in graph execution {self._graph_exec_id}"
)
new_execution = NodeExecutionResult(
user_id=self._graph_metadata.user_id,
graph_id=self._graph_metadata.graph_id,
graph_version=self._graph_metadata.graph_version,
graph_exec_id=self._graph_exec_id,
node_exec_id=node_exec_id,
node_id=node_id,
block_id=block_id,
status=ExecutionStatus.INCOMPLETE,
input_data={input_name: input_data},
output_data={},
add_time=datetime.now(timezone.utc),
)
self._cache.add_node_execution(node_exec_id, new_execution)
self._persist_new_node_execution_to_db(
node_exec_id, node_id, input_name, input_data
)
return node_exec_id, {input_name: input_data}
def upsert_execution_output(
self, node_exec_id: str, output_name: str, output_data: Any
):
self._cache.upsert_execution_output(node_exec_id, output_name, output_data)
self._persist_execution_output_to_db(node_exec_id, output_name, output_data)
def update_graph_stats_and_publish(
self,
status: ExecutionStatus | None = None,
stats: GraphExecutionStats | None = None,
) -> None:
stats_dict = stats.model_dump() if stats else None
self._cache.update_graph_stats(status=status, stats=stats_dict)
self._persist_graph_stats_to_db(status=status, stats=stats)
def update_graph_start_time_and_publish(self) -> None:
self._cache.update_graph_start_time()
self._persist_graph_start_time_to_db()
@non_blocking_persist
def _persist_node_status_to_db(
self,
exec_id: str,
status: ExecutionStatus,
execution_data: dict | None = None,
stats: dict[str, Any] | None = None,
):
exec_update = self.db_client_sync.update_node_execution_status(
exec_id, status, execution_data, stats
)
self.event_bus.publish(exec_update)
@non_blocking_persist
def _persist_add_input_to_db(
self, node_exec_id: str, input_name: str, input_data: Any
):
self.db_client_sync.add_input_to_node_execution(
node_exec_id=node_exec_id,
input_name=input_name,
input_data=input_data,
)
@non_blocking_persist
def _persist_execution_output_to_db(
self, node_exec_id: str, output_name: str, output_data: Any
):
self.db_client_sync.upsert_execution_output(
node_exec_id=node_exec_id,
output_name=output_name,
output_data=output_data,
)
if exec_update := self.get_node_execution(node_exec_id):
self.event_bus.publish(exec_update)
@non_blocking_persist
def _persist_graph_stats_to_db(
self,
status: ExecutionStatus | None = None,
stats: GraphExecutionStats | None = None,
):
graph_update = self.db_client_sync.update_graph_execution_stats(
self._graph_exec_id, status, stats
)
if not graph_update:
raise RuntimeError(
f"Failed to update graph execution stats for {self._graph_exec_id}"
)
self.event_bus.publish(graph_update)
@non_blocking_persist
def _persist_graph_start_time_to_db(self):
graph_update = self.db_client_sync.update_graph_execution_start_time(
self._graph_exec_id
)
if not graph_update:
raise RuntimeError(
f"Failed to update graph execution start time for {self._graph_exec_id}"
)
self.event_bus.publish(graph_update)
async def generate_activity_status(
self,
graph_id: str,
graph_version: int,
execution_stats: GraphExecutionStats,
user_id: str,
execution_status: ExecutionStatus,
) -> str | None:
from backend.executor.activity_status_generator import (
generate_activity_status_for_execution,
)
return await generate_activity_status_for_execution(
graph_exec_id=self._graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_stats=execution_stats,
db_client=self.db_client_async,
user_id=user_id,
execution_status=execution_status,
)
@non_blocking_persist
def _send_execution_update(self, execution: NodeExecutionResult):
"""Send execution update to event bus."""
try:
self.event_bus.publish(execution)
except Exception as e:
logger.warning(f"Failed to send execution update: {e}")
@non_blocking_persist
def _persist_new_node_execution_to_db(
self, node_exec_id: str, node_id: str, input_name: str, input_data: Any
):
try:
self.db_client_sync.create_node_execution(
node_exec_id=node_exec_id,
node_id=node_id,
graph_exec_id=self._graph_exec_id,
input_name=input_name,
input_data=input_data,
)
except Exception as e:
logger.error(
f"Failed to create node execution {node_exec_id} for node {node_id} "
f"in graph execution {self._graph_exec_id}: {e}"
)
raise
def increment_execution_count(self, user_id: str) -> int:
r = redis.get_redis()
k = f"uec:{user_id}"
counter = cast(int, r.incr(k))
if counter == 1:
r.expire(k, settings.config.execution_counter_expiration_time)
return counter

View File

@@ -0,0 +1,668 @@
"""Test suite for ExecutionDataClient."""
import asyncio
import threading
import pytest
from backend.data.execution import ExecutionStatus
from backend.executor.execution_data import ExecutionDataClient
@pytest.fixture
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def execution_client(event_loop):
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
mock_graph_meta = GraphExecutionMeta(
id="test_graph_exec_id",
user_id="test_user_id",
graph_id="test_graph_id",
graph_version=1,
status=ExecutionStatus.RUNNING,
started_at=datetime.now(timezone.utc),
ended_at=datetime.now(timezone.utc),
stats=None,
)
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=1)
# Mock all database operations to prevent connection attempts
async_mock_client = AsyncMock()
sync_mock_client = MagicMock()
# Mock all database methods to return None or empty results
sync_mock_client.get_node_executions.return_value = []
sync_mock_client.create_node_execution.return_value = None
sync_mock_client.add_input_to_node_execution.return_value = None
sync_mock_client.update_node_execution_status.return_value = None
sync_mock_client.upsert_execution_output.return_value = None
sync_mock_client.update_graph_execution_stats.return_value = mock_graph_meta
sync_mock_client.update_graph_execution_start_time.return_value = mock_graph_meta
# Mock event bus to prevent connection attempts
mock_event_bus = MagicMock()
mock_event_bus.publish.return_value = None
thread = threading.Thread(target=event_loop.run_forever, daemon=True)
thread.start()
with patch(
"backend.executor.execution_data.get_database_manager_async_client",
return_value=async_mock_client,
), patch(
"backend.executor.execution_data.get_database_manager_client",
return_value=sync_mock_client,
), patch(
"backend.executor.execution_data.get_execution_event_bus",
return_value=mock_event_bus,
), patch(
"backend.executor.execution_data.non_blocking_persist", lambda func: func
):
client = ExecutionDataClient(executor, "test_graph_exec_id", mock_graph_meta)
yield client
event_loop.call_soon_threadsafe(event_loop.stop)
thread.join(timeout=1)
class TestExecutionDataClient:
async def test_update_node_status_writes_to_cache_immediately(
self, execution_client
):
"""Test that node status updates are immediately visible in cache."""
# First create an execution to update
node_exec_id, _ = execution_client.upsert_execution_input(
node_id="test-node",
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
status = ExecutionStatus.RUNNING
execution_data = {"step": "processing"}
stats = {"duration": 5.2}
# Update status of existing execution
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=status,
execution_data=execution_data,
stats=stats,
)
# Verify immediate visibility in cache
cached_exec = execution_client.get_node_execution(node_exec_id)
assert cached_exec is not None
assert cached_exec.status == status
# execution_data should be merged with existing input_data, not replace it
expected_input_data = {"test_input": "test_value", "step": "processing"}
assert cached_exec.input_data == expected_input_data
def test_update_node_status_execution_not_found_raises_error(
self, execution_client
):
"""Test that updating non-existent execution raises error instead of creating it."""
non_existent_id = "does-not-exist"
with pytest.raises(
RuntimeError, match="Execution does-not-exist not found in cache"
):
execution_client.update_node_status_and_publish(
exec_id=non_existent_id, status=ExecutionStatus.COMPLETED
)
async def test_upsert_execution_output_writes_to_cache_immediately(
self, execution_client
):
"""Test that output updates are immediately visible in cache."""
# First create an execution
node_exec_id, _ = execution_client.upsert_execution_input(
node_id="test-node",
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
output_name = "result"
output_data = {"answer": 42, "confidence": 0.95}
# Update to RUNNING status first
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.RUNNING,
execution_data={"input": "test"},
)
execution_client.upsert_execution_output(
node_exec_id=node_exec_id, output_name=output_name, output_data=output_data
)
# Check output through the node execution
cached_exec = execution_client.get_node_execution(node_exec_id)
assert cached_exec is not None
assert output_name in cached_exec.output_data
assert cached_exec.output_data[output_name] == [output_data]
async def test_get_node_execution_reads_from_cache(self, execution_client):
"""Test that get_node_execution returns cached data immediately."""
# First create an execution to work with
node_exec_id, _ = execution_client.upsert_execution_input(
node_id="test-node",
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
# Then update its status
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.COMPLETED,
execution_data={"result": "success"},
)
result = execution_client.get_node_execution(node_exec_id)
assert result is not None
assert result.status == ExecutionStatus.COMPLETED
# execution_data gets merged with existing input_data
expected_input_data = {"test_input": "test_value", "result": "success"}
assert result.input_data == expected_input_data
async def test_get_latest_node_execution_reads_from_cache(self, execution_client):
"""Test that get_latest_node_execution returns cached data."""
node_id = "node-1"
# First create an execution for this node
node_exec_id, _ = execution_client.upsert_execution_input(
node_id=node_id,
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
# Then update its status to make it non-INCOMPLETE (so it's returned by get_latest)
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.RUNNING,
execution_data={"from": "cache"},
)
result = execution_client.get_latest_node_execution(node_id)
assert result is not None
assert result.status == ExecutionStatus.RUNNING
# execution_data gets merged with existing input_data
expected_input_data = {"test_input": "test_value", "from": "cache"}
assert result.input_data == expected_input_data
async def test_get_node_executions_sync_filters_correctly(self, execution_client):
# Create executions with different statuses
executions = [
(ExecutionStatus.RUNNING, "block-a"),
(ExecutionStatus.COMPLETED, "block-a"),
(ExecutionStatus.FAILED, "block-b"),
(ExecutionStatus.RUNNING, "block-b"),
]
exec_ids = []
for i, (status, block_id) in enumerate(executions):
# First create the execution
exec_id, _ = execution_client.upsert_execution_input(
node_id=f"node-{i}",
input_name="test_input",
input_data="test_value",
block_id=block_id,
)
exec_ids.append(exec_id)
# Then update its status and metadata
execution_client.update_node_status_and_publish(
exec_id=exec_id, status=status, execution_data={"block": block_id}
)
# Update cached execution with graph_exec_id and block_id for filtering
# Note: In real implementation, these would be set during creation
# For test purposes, we'll skip this manual update since the filtering
# logic should work with the data as created
# Test status filtering
running_execs = execution_client.get_node_executions(
statuses=[ExecutionStatus.RUNNING]
)
assert len(running_execs) == 2
assert all(e.status == ExecutionStatus.RUNNING for e in running_execs)
# Test block_id filtering
block_a_execs = execution_client.get_node_executions(block_ids=["block-a"])
assert len(block_a_execs) == 2
assert all(e.block_id == "block-a" for e in block_a_execs)
# Test combined filtering
running_block_b = execution_client.get_node_executions(
statuses=[ExecutionStatus.RUNNING], block_ids=["block-b"]
)
assert len(running_block_b) == 1
assert running_block_b[0].status == ExecutionStatus.RUNNING
assert running_block_b[0].block_id == "block-b"
async def test_write_then_read_consistency(self, execution_client):
"""Test critical race condition scenario: immediate read after write."""
# First create an execution to work with
node_exec_id, _ = execution_client.upsert_execution_input(
node_id="consistency-test-node",
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
# Write status
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.RUNNING,
execution_data={"step": 1},
)
# Write output
execution_client.upsert_execution_output(
node_exec_id=node_exec_id,
output_name="intermediate",
output_data={"progress": 50},
)
# Update status again
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.COMPLETED,
execution_data={"step": 2},
)
# All changes should be immediately visible
cached_exec = execution_client.get_node_execution(node_exec_id)
assert cached_exec is not None
assert cached_exec.status == ExecutionStatus.COMPLETED
# execution_data gets merged with existing input_data - step 2 overwrites step 1
expected_input_data = {"test_input": "test_value", "step": 2}
assert cached_exec.input_data == expected_input_data
# Output should be visible in execution record
assert cached_exec.output_data["intermediate"] == [{"progress": 50}]
async def test_concurrent_operations_are_thread_safe(self, execution_client):
"""Test that concurrent operations don't corrupt cache."""
num_threads = 3 # Reduced for simpler test
operations_per_thread = 5 # Reduced for simpler test
# Create all executions upfront
created_exec_ids = []
for thread_id in range(num_threads):
for i in range(operations_per_thread):
exec_id, _ = execution_client.upsert_execution_input(
node_id=f"node-{thread_id}-{i}",
input_name="test_input",
input_data="test_value",
block_id=f"block-{thread_id}-{i}",
)
created_exec_ids.append((exec_id, thread_id, i))
def worker(thread_data):
"""Perform multiple operations from a thread."""
thread_id, ops = thread_data
for i, (exec_id, _, _) in enumerate(ops):
# Status updates
execution_client.update_node_status_and_publish(
exec_id=exec_id,
status=ExecutionStatus.RUNNING,
execution_data={"thread": thread_id, "op": i},
)
# Output updates (use just one exec_id per thread for outputs)
if i == 0: # Only add outputs to first execution of each thread
execution_client.upsert_execution_output(
node_exec_id=exec_id,
output_name=f"output_{i}",
output_data={"thread": thread_id, "value": i},
)
# Organize executions by thread
thread_data = []
for tid in range(num_threads):
thread_ops = [
exec_data for exec_data in created_exec_ids if exec_data[1] == tid
]
thread_data.append((tid, thread_ops))
# Start multiple threads
threads = []
for data in thread_data:
thread = threading.Thread(target=worker, args=(data,))
threads.append(thread)
thread.start()
# Wait for completion
for thread in threads:
thread.join()
# Verify data integrity
expected_executions = num_threads * operations_per_thread
all_executions = execution_client.get_node_executions()
assert len(all_executions) == expected_executions
# Verify outputs - only first execution of each thread should have outputs
output_count = 0
for execution in all_executions:
if execution.output_data:
output_count += 1
assert output_count == num_threads # One output per thread
async def test_sync_and_async_versions_consistent(self, execution_client):
"""Test that sync and async versions of output operations behave the same."""
# First create the execution
node_exec_id, _ = execution_client.upsert_execution_input(
node_id="sync-async-test-node",
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.RUNNING,
execution_data={"input": "test"},
)
execution_client.upsert_execution_output(
node_exec_id=node_exec_id,
output_name="sync_result",
output_data={"method": "sync"},
)
execution_client.upsert_execution_output(
node_exec_id=node_exec_id,
output_name="async_result",
output_data={"method": "async"},
)
cached_exec = execution_client.get_node_execution(node_exec_id)
assert cached_exec is not None
assert "sync_result" in cached_exec.output_data
assert "async_result" in cached_exec.output_data
assert cached_exec.output_data["sync_result"] == [{"method": "sync"}]
assert cached_exec.output_data["async_result"] == [{"method": "async"}]
async def test_finalize_execution_completes_and_clears_cache(
self, execution_client
):
"""Test that finalize_execution waits for background tasks and clears cache."""
# First create the execution
node_exec_id, _ = execution_client.upsert_execution_input(
node_id="pending-test-node",
input_name="test_input",
input_data="test_value",
block_id="test-block",
)
# Trigger some background operations
execution_client.update_node_status_and_publish(
exec_id=node_exec_id, status=ExecutionStatus.RUNNING
)
execution_client.upsert_execution_output(
node_exec_id=node_exec_id, output_name="test", output_data={"value": 1}
)
# Wait for background tasks - may fail in test environment due to DB issues
try:
execution_client.finalize_execution(timeout=5.0)
except RuntimeError as e:
# In test environment, background DB operations may fail, but cache should still be cleared
assert "Background persistence failed" in str(e)
# Cache should be cleared regardless of background task failures
all_executions = execution_client.get_node_executions()
assert len(all_executions) == 0 # Cache should be cleared
async def test_manager_usage_pattern(self, execution_client):
# Create executions first
node_exec_id_1, _ = execution_client.upsert_execution_input(
node_id="node-1",
input_name="input1",
input_data="data1",
block_id="block-1",
)
node_exec_id_2, _ = execution_client.upsert_execution_input(
node_id="node-2",
input_name="input_from_node1",
input_data="value1",
block_id="block-2",
)
# Simulate manager.py workflow
# 1. Start execution
execution_client.update_node_status_and_publish(
exec_id=node_exec_id_1,
status=ExecutionStatus.RUNNING,
execution_data={"input": "data1"},
)
# 2. Node produces output
execution_client.upsert_execution_output(
node_exec_id=node_exec_id_1,
output_name="result",
output_data={"output": "value1"},
)
# 3. Complete first node
execution_client.update_node_status_and_publish(
exec_id=node_exec_id_1, status=ExecutionStatus.COMPLETED
)
# 4. Start second node (would read output from first)
execution_client.update_node_status_and_publish(
exec_id=node_exec_id_2,
status=ExecutionStatus.RUNNING,
execution_data={"input_from_node1": "value1"},
)
# 5. Manager queries for executions
all_executions = execution_client.get_node_executions()
running_executions = execution_client.get_node_executions(
statuses=[ExecutionStatus.RUNNING]
)
completed_executions = execution_client.get_node_executions(
statuses=[ExecutionStatus.COMPLETED]
)
# Verify manager can see all data immediately
assert len(all_executions) == 2
assert len(running_executions) == 1
assert len(completed_executions) == 1
# Verify output is accessible
exec_1 = execution_client.get_node_execution(node_exec_id_1)
assert exec_1 is not None
assert exec_1.output_data["result"] == [{"output": "value1"}]
def test_stats_handling_in_update_node_status(self, execution_client):
"""Test that stats parameter is properly handled in update_node_status_and_publish."""
# Create a fake execution directly in cache to avoid database issues
from datetime import datetime, timezone
from backend.data.execution import NodeExecutionResult
node_exec_id = "test-stats-exec-id"
fake_execution = NodeExecutionResult(
user_id="test-user",
graph_id="test-graph",
graph_version=1,
graph_exec_id="test-graph-exec",
node_exec_id=node_exec_id,
node_id="stats-test-node",
block_id="test-block",
status=ExecutionStatus.INCOMPLETE,
input_data={"test_input": "test_value"},
output_data={},
add_time=datetime.now(timezone.utc),
queue_time=None,
start_time=None,
end_time=None,
stats=None,
)
# Add directly to cache
execution_client._cache.add_node_execution(node_exec_id, fake_execution)
stats = {"token_count": 150, "processing_time": 2.5}
# Update status with stats
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.RUNNING,
execution_data={"input": "test"},
stats=stats,
)
# Verify execution was updated and stats are stored
execution = execution_client.get_node_execution(node_exec_id)
assert execution is not None
assert execution.status == ExecutionStatus.RUNNING
# Stats should be stored in proper stats field
assert execution.stats is not None
stats_dict = execution.stats.model_dump()
# Only check the fields we set, ignore defaults
assert stats_dict["token_count"] == 150
assert stats_dict["processing_time"] == 2.5
# Update with additional stats
additional_stats = {"error_count": 0}
execution_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.COMPLETED,
stats=additional_stats,
)
# Stats should be merged
execution = execution_client.get_node_execution(node_exec_id)
assert execution is not None
assert execution.status == ExecutionStatus.COMPLETED
stats_dict = execution.stats.model_dump()
# Check the merged stats
assert stats_dict["token_count"] == 150
assert stats_dict["processing_time"] == 2.5
assert stats_dict["error_count"] == 0
async def test_upsert_execution_input_scenarios(self, execution_client):
"""Test different scenarios of upsert_execution_input - create vs update."""
node_id = "test-node"
graph_exec_id = (
"test_graph_exec_id" # Must match the ExecutionDataClient's scope
)
# Scenario 1: Create new execution when none exists
exec_id_1, input_data_1 = execution_client.upsert_execution_input(
node_id=node_id,
input_name="first_input",
input_data="value1",
block_id="test-block",
)
# Should create new execution
execution = execution_client.get_node_execution(exec_id_1)
assert execution is not None
assert execution.status == ExecutionStatus.INCOMPLETE
assert execution.node_id == node_id
assert execution.graph_exec_id == graph_exec_id
assert input_data_1 == {"first_input": "value1"}
# Scenario 2: Add input to existing INCOMPLETE execution
exec_id_2, input_data_2 = execution_client.upsert_execution_input(
node_id=node_id,
input_name="second_input",
input_data="value2",
block_id="test-block",
)
# Should use same execution
assert exec_id_2 == exec_id_1
assert input_data_2 == {"first_input": "value1", "second_input": "value2"}
# Verify execution has both inputs
execution = execution_client.get_node_execution(exec_id_1)
assert execution is not None
assert execution.input_data == {
"first_input": "value1",
"second_input": "value2",
}
# Scenario 3: Create new execution when existing is not INCOMPLETE
execution_client.update_node_status_and_publish(
exec_id=exec_id_1, status=ExecutionStatus.RUNNING
)
exec_id_3, input_data_3 = execution_client.upsert_execution_input(
node_id=node_id,
input_name="third_input",
input_data="value3",
block_id="test-block",
)
# Should create new execution
assert exec_id_3 != exec_id_1
execution_3 = execution_client.get_node_execution(exec_id_3)
assert execution_3 is not None
assert input_data_3 == {"third_input": "value3"}
# Verify we now have 2 executions
all_executions = execution_client.get_node_executions()
assert len(all_executions) == 2
def test_graph_stats_operations(self, execution_client):
"""Test graph-level stats and start time operations."""
# Test update_graph_stats_and_publish
from backend.data.model import GraphExecutionStats
stats = GraphExecutionStats(
walltime=10.5, cputime=8.2, node_count=5, node_error_count=1
)
execution_client.update_graph_stats_and_publish(
status=ExecutionStatus.RUNNING, stats=stats
)
# Verify stats are stored in cache
cached_stats = execution_client._cache._graph_stats
assert cached_stats.walltime == 10.5
execution_client.update_graph_start_time_and_publish()
cached_stats = execution_client._cache._graph_stats
assert cached_stats.walltime == 10.5
def test_public_methods_accessible(self, execution_client):
"""Test that public methods are accessible."""
assert hasattr(execution_client._cache, "update_node_execution_status")
assert hasattr(execution_client._cache, "upsert_execution_output")
assert hasattr(execution_client._cache, "add_node_execution")
assert hasattr(execution_client._cache, "find_incomplete_execution_for_input")
assert hasattr(execution_client._cache, "update_execution_input")
assert hasattr(execution_client, "upsert_execution_input")
assert hasattr(execution_client, "update_node_status_and_publish")
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -5,14 +5,31 @@ import threading
import time
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast
from typing import Any, Optional, TypeVar, cast
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
from redis.asyncio.lock import Lock as RedisLock
from prometheus_client import Gauge, start_http_server
from pydantic import JsonValue
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.io import AgentOutputBlock
from backend.data.block import (
BlockData,
BlockInput,
BlockOutput,
BlockSchema,
get_block,
)
from backend.data.credit import UsageTransactionMetadata
from backend.data.execution import (
ExecutionQueue,
ExecutionStatus,
GraphExecutionEntry,
NodeExecutionEntry,
UserContext,
)
from backend.data.graph import Link, Node
from backend.data.model import GraphExecutionStats, NodeExecutionStats
from backend.data.notifications import (
AgentRunData,
@@ -22,45 +39,14 @@ from backend.data.notifications import (
ZeroBalanceData,
)
from backend.data.rabbitmq import SyncRabbitMQ
from backend.executor.activity_status_generator import (
generate_activity_status_for_execution,
)
from backend.executor.utils import LogMetadata
from backend.notifications.notifications import queue_notification
from backend.util.exceptions import InsufficientBalanceError, ModerationError
if TYPE_CHECKING:
from backend.executor import DatabaseManagerClient, DatabaseManagerAsyncClient
from prometheus_client import Gauge, start_http_server
from backend.blocks.agent import AgentExecutorBlock
from backend.data import redis_client as redis
from backend.data.block import (
BlockInput,
BlockOutput,
BlockOutputEntry,
BlockSchema,
get_block,
)
from backend.data.credit import UsageTransactionMetadata
from backend.data.execution import (
ExecutionQueue,
ExecutionStatus,
GraphExecution,
GraphExecutionEntry,
NodeExecutionEntry,
NodeExecutionResult,
NodesInputMasks,
UserContext,
)
from backend.data.graph import Link, Node
from backend.executor.execution_data import ExecutionDataClient
from backend.executor.utils import (
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
GRAPH_EXECUTION_QUEUE_NAME,
CancelExecutionEvent,
ExecutionOutputEntry,
LogMetadata,
NodeExecutionProgress,
block_usage_cost,
create_execution_queue_config,
@@ -69,21 +55,17 @@ from backend.executor.utils import (
validate_exec,
)
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.notifications.notifications import queue_notification
from backend.server.v2.AutoMod.manager import automod_manager
from backend.util import json
from backend.util.clients import (
get_async_execution_event_bus,
get_database_manager_async_client,
get_database_manager_client,
get_execution_event_bus,
get_notification_manager_client,
)
from backend.util.clients import get_notification_manager_client
from backend.util.decorator import (
async_error_logged,
async_time_measured,
error_logged,
time_measured,
)
from backend.util.exceptions import InsufficientBalanceError, ModerationError
from backend.util.file import clean_exec_files
from backend.util.logging import TruncatedLogger, configure_logging
from backend.util.metrics import DiscordChannel
@@ -131,14 +113,13 @@ async def execute_node(
creds_manager: IntegrationCredentialsManager,
data: NodeExecutionEntry,
execution_stats: NodeExecutionStats | None = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> BlockOutput:
"""
Execute a node in the graph. This will trigger a block execution on a node,
persist the execution result, and return the subsequent node to be executed.
Args:
db_client: The client to send execution updates to the server.
creds_manager: The manager to acquire and release credentials.
data: The execution data for executing the current node.
execution_stats: The execution statistics to be updated.
@@ -235,21 +216,20 @@ async def execute_node(
async def _enqueue_next_nodes(
db_client: "DatabaseManagerAsyncClient",
execution_data_client: ExecutionDataClient,
node: Node,
output: BlockOutputEntry,
output: BlockData,
user_id: str,
graph_exec_id: str,
graph_id: str,
log_metadata: LogMetadata,
nodes_input_masks: Optional[NodesInputMasks],
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]],
user_context: UserContext,
) -> list[NodeExecutionEntry]:
async def add_enqueued_execution(
node_exec_id: str, node_id: str, block_id: str, data: BlockInput
) -> NodeExecutionEntry:
await async_update_node_execution_status(
db_client=db_client,
execution_data_client.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.QUEUED,
execution_data=data,
@@ -282,21 +262,22 @@ async def _enqueue_next_nodes(
next_data = parse_execution_output(output, next_output_name)
if next_data is None and output_name != next_output_name:
return enqueued_executions
next_node = await db_client.get_node(next_node_id)
next_node = await execution_data_client.get_node(next_node_id)
# Multiple node can register the same next node, we need this to be atomic
# To avoid same execution to be enqueued multiple times,
# Or the same input to be consumed multiple times.
async with synchronized(f"upsert_input-{next_node_id}-{graph_exec_id}"):
with execution_data_client.graph_lock:
# Add output data to the earliest incomplete execution, or create a new one.
next_node_exec_id, next_node_input = await db_client.upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
input_data=next_data,
next_node_exec_id, next_node_input = (
execution_data_client.upsert_execution_input(
node_id=next_node_id,
input_name=next_input_name,
input_data=next_data,
block_id=next_node.block_id,
)
)
await async_update_node_execution_status(
db_client=db_client,
execution_data_client.update_node_status_and_publish(
exec_id=next_node_exec_id,
status=ExecutionStatus.INCOMPLETE,
)
@@ -308,8 +289,8 @@ async def _enqueue_next_nodes(
if link.is_static and link.sink_name not in next_node_input
}
if static_link_names and (
latest_execution := await db_client.get_latest_node_execution(
next_node_id, graph_exec_id
latest_execution := execution_data_client.get_latest_node_execution(
next_node_id
)
):
for name in static_link_names:
@@ -348,9 +329,8 @@ async def _enqueue_next_nodes(
# If link is static, there could be some incomplete executions waiting for it.
# Load and complete the input missing input data, and try to re-enqueue them.
for iexec in await db_client.get_node_executions(
for iexec in execution_data_client.get_node_executions(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
statuses=[ExecutionStatus.INCOMPLETE],
):
idata = iexec.input_data
@@ -414,12 +394,15 @@ class ExecutionProcessor:
9. Node executor enqueues the next executed nodes to the node execution queue.
"""
# Current execution data client (scoped to current graph execution)
execution_data: ExecutionDataClient
@async_error_logged(swallow=True)
async def on_node_execution(
self,
node_exec: NodeExecutionEntry,
node_exec_progress: NodeExecutionProgress,
nodes_input_masks: Optional[NodesInputMasks],
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]],
graph_stats_pair: tuple[GraphExecutionStats, threading.Lock],
) -> NodeExecutionStats:
log_metadata = LogMetadata(
@@ -431,8 +414,7 @@ class ExecutionProcessor:
node_id=node_exec.node_id,
block_name="-",
)
db_client = get_db_async_client()
node = await db_client.get_node(node_exec.node_id)
node = await self.execution_data.get_node(node_exec.node_id)
execution_stats = NodeExecutionStats()
timing_info, status = await self._on_node_execution(
@@ -440,7 +422,6 @@ class ExecutionProcessor:
node_exec=node_exec,
node_exec_progress=node_exec_progress,
stats=execution_stats,
db_client=db_client,
log_metadata=log_metadata,
nodes_input_masks=nodes_input_masks,
)
@@ -464,15 +445,12 @@ class ExecutionProcessor:
if node_error and not isinstance(node_error, str):
node_stats["error"] = str(node_error) or node_stats.__class__.__name__
await async_update_node_execution_status(
db_client=db_client,
self.execution_data.update_node_status_and_publish(
exec_id=node_exec.node_exec_id,
status=status,
stats=node_stats,
)
await async_update_graph_execution_state(
db_client=db_client,
graph_exec_id=node_exec.graph_exec_id,
self.execution_data.update_graph_stats_and_publish(
stats=graph_stats,
)
@@ -485,22 +463,17 @@ class ExecutionProcessor:
node_exec: NodeExecutionEntry,
node_exec_progress: NodeExecutionProgress,
stats: NodeExecutionStats,
db_client: "DatabaseManagerAsyncClient",
log_metadata: LogMetadata,
nodes_input_masks: Optional[NodesInputMasks] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> ExecutionStatus:
status = ExecutionStatus.RUNNING
async def persist_output(output_name: str, output_data: Any) -> None:
await db_client.upsert_execution_output(
self.execution_data.upsert_execution_output(
node_exec_id=node_exec.node_exec_id,
output_name=output_name,
output_data=output_data,
)
if exec_update := await db_client.get_node_execution(
node_exec.node_exec_id
):
await send_async_execution_update(exec_update)
node_exec_progress.add_output(
ExecutionOutputEntry(
@@ -512,8 +485,7 @@ class ExecutionProcessor:
try:
log_metadata.info(f"Start node execution {node_exec.node_exec_id}")
await async_update_node_execution_status(
db_client=db_client,
self.execution_data.update_node_status_and_publish(
exec_id=node_exec.node_exec_id,
status=ExecutionStatus.RUNNING,
)
@@ -574,6 +546,8 @@ class ExecutionProcessor:
self.node_evaluation_thread = threading.Thread(
target=self.node_evaluation_loop.run_forever, daemon=True
)
# single thread executor
self.execution_data_executor = ThreadPoolExecutor(max_workers=1)
self.node_execution_thread.start()
self.node_evaluation_thread.start()
logger.info(f"[GraphExecutor] {self.tid} started")
@@ -593,9 +567,13 @@ class ExecutionProcessor:
node_eid="*",
block_name="-",
)
db_client = get_db_client()
exec_meta = db_client.get_graph_execution_meta(
# Get graph execution metadata first via sync client
from backend.util.clients import get_database_manager_client
db_client_sync = get_database_manager_client()
exec_meta = db_client_sync.get_graph_execution_meta(
user_id=graph_exec.user_id,
execution_id=graph_exec.graph_exec_id,
)
@@ -605,12 +583,15 @@ class ExecutionProcessor:
)
return
# Create scoped ExecutionDataClient for this graph execution with metadata
self.execution_data = ExecutionDataClient(
self.execution_data_executor, graph_exec.graph_exec_id, exec_meta
)
if exec_meta.status == ExecutionStatus.QUEUED:
log_metadata.info(f"⚙️ Starting graph execution #{graph_exec.graph_exec_id}")
exec_meta.status = ExecutionStatus.RUNNING
send_execution_update(
db_client.update_graph_execution_start_time(graph_exec.graph_exec_id)
)
self.execution_data.update_graph_start_time_and_publish()
elif exec_meta.status == ExecutionStatus.RUNNING:
log_metadata.info(
f"⚙️ Graph execution #{graph_exec.graph_exec_id} is already running, continuing where it left off."
@@ -620,9 +601,7 @@ class ExecutionProcessor:
log_metadata.info(
f"⚙️ Graph execution #{graph_exec.graph_exec_id} was disturbed, continuing where it left off."
)
update_graph_execution_state(
db_client=db_client,
graph_exec_id=graph_exec.graph_exec_id,
self.execution_data.update_graph_stats_and_publish(
status=ExecutionStatus.RUNNING,
)
else:
@@ -653,12 +632,10 @@ class ExecutionProcessor:
# Activity status handling
activity_status = asyncio.run_coroutine_threadsafe(
generate_activity_status_for_execution(
graph_exec_id=graph_exec.graph_exec_id,
self.execution_data.generate_activity_status(
graph_id=graph_exec.graph_id,
graph_version=graph_exec.graph_version,
execution_stats=exec_stats,
db_client=get_db_async_client(),
user_id=graph_exec.user_id,
execution_status=status,
),
@@ -673,15 +650,14 @@ class ExecutionProcessor:
)
# Communication handling
self._handle_agent_run_notif(db_client, graph_exec, exec_stats)
self._handle_agent_run_notif(graph_exec, exec_stats)
finally:
update_graph_execution_state(
db_client=db_client,
graph_exec_id=graph_exec.graph_exec_id,
self.execution_data.update_graph_stats_and_publish(
status=exec_meta.status,
stats=exec_stats,
)
self.execution_data.finalize_execution()
def _charge_usage(
self,
@@ -690,7 +666,6 @@ class ExecutionProcessor:
) -> tuple[int, int]:
total_cost = 0
remaining_balance = 0
db_client = get_db_client()
block = get_block(node_exec.block_id)
if not block:
logger.error(f"Block {node_exec.block_id} not found.")
@@ -700,7 +675,7 @@ class ExecutionProcessor:
block=block, input_data=node_exec.inputs
)
if cost > 0:
remaining_balance = db_client.spend_credits(
remaining_balance = self.execution_data.spend_credits(
user_id=node_exec.user_id,
cost=cost,
metadata=UsageTransactionMetadata(
@@ -718,7 +693,7 @@ class ExecutionProcessor:
cost, usage_count = execution_usage_cost(execution_count)
if cost > 0:
remaining_balance = db_client.spend_credits(
remaining_balance = self.execution_data.spend_credits(
user_id=node_exec.user_id,
cost=cost,
metadata=UsageTransactionMetadata(
@@ -751,7 +726,6 @@ class ExecutionProcessor:
"""
execution_status: ExecutionStatus = ExecutionStatus.RUNNING
error: Exception | None = None
db_client = get_db_client()
execution_stats_lock = threading.Lock()
# State holders ----------------------------------------------------
@@ -762,7 +736,7 @@ class ExecutionProcessor:
execution_queue = ExecutionQueue[NodeExecutionEntry]()
try:
if db_client.get_credits(graph_exec.user_id) <= 0:
if self.execution_data.get_credits(graph_exec.user_id) <= 0:
raise InsufficientBalanceError(
user_id=graph_exec.user_id,
message="You have no credits left to run an agent.",
@@ -774,7 +748,7 @@ class ExecutionProcessor:
try:
if moderation_error := asyncio.run_coroutine_threadsafe(
automod_manager.moderate_graph_execution_inputs(
db_client=get_db_async_client(),
db_client=self.execution_data.db_client_async,
graph_exec=graph_exec,
),
self.node_evaluation_loop,
@@ -789,16 +763,34 @@ class ExecutionProcessor:
# ------------------------------------------------------------
# Prepopulate queue ---------------------------------------
# ------------------------------------------------------------
for node_exec in db_client.get_node_executions(
graph_exec.graph_exec_id,
queued_executions = self.execution_data.get_node_executions(
statuses=[
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.TERMINATED,
],
):
node_entry = node_exec.to_node_execution_entry(graph_exec.user_context)
execution_queue.add(node_entry)
)
log_metadata.info(
f"Pre-populating queue with {len(queued_executions)} executions from cache"
)
for i, node_exec in enumerate(queued_executions):
log_metadata.info(
f" [{i}] {node_exec.node_exec_id}: status={node_exec.status}, node={node_exec.node_id}"
)
try:
node_entry = node_exec.to_node_execution_entry(
graph_exec.user_context
)
execution_queue.add(node_entry)
log_metadata.info(" Added to execution queue successfully")
except Exception as e:
log_metadata.error(f" Failed to add to execution queue: {e}")
log_metadata.info(
f"Execution queue populated with {len(queued_executions)} executions"
)
# ------------------------------------------------------------
# Main dispatch / polling loop -----------------------------
@@ -818,13 +810,14 @@ class ExecutionProcessor:
try:
cost, remaining_balance = self._charge_usage(
node_exec=queued_node_exec,
execution_count=increment_execution_count(graph_exec.user_id),
execution_count=self.execution_data.increment_execution_count(
graph_exec.user_id
),
)
with execution_stats_lock:
execution_stats.cost += cost
# Check if we crossed the low balance threshold
self._handle_low_balance(
db_client=db_client,
user_id=graph_exec.user_id,
current_balance=remaining_balance,
transaction_cost=cost,
@@ -832,19 +825,17 @@ class ExecutionProcessor:
except InsufficientBalanceError as balance_error:
error = balance_error # Set error to trigger FAILED status
node_exec_id = queued_node_exec.node_exec_id
db_client.upsert_execution_output(
self.execution_data.upsert_execution_output(
node_exec_id=node_exec_id,
output_name="error",
output_data=str(error),
)
update_node_execution_status(
db_client=db_client,
self.execution_data.update_node_status_and_publish(
exec_id=node_exec_id,
status=ExecutionStatus.FAILED,
)
self._handle_insufficient_funds_notif(
db_client,
graph_exec.user_id,
graph_exec.graph_id,
error,
@@ -931,12 +922,13 @@ class ExecutionProcessor:
time.sleep(0.1)
# loop done --------------------------------------------------
# Background task finalization moved to finally block
# Output moderation
try:
if moderation_error := asyncio.run_coroutine_threadsafe(
automod_manager.moderate_graph_execution_outputs(
db_client=get_db_async_client(),
db_client=self.execution_data.db_client_async,
graph_exec_id=graph_exec.graph_exec_id,
user_id=graph_exec.user_id,
graph_id=graph_exec.graph_id,
@@ -990,7 +982,6 @@ class ExecutionProcessor:
error=error,
graph_exec_id=graph_exec.graph_exec_id,
log_metadata=log_metadata,
db_client=db_client,
)
@error_logged(swallow=True)
@@ -1003,7 +994,6 @@ class ExecutionProcessor:
error: Exception | None,
graph_exec_id: str,
log_metadata: LogMetadata,
db_client: "DatabaseManagerClient",
) -> None:
"""
Clean up running node executions and evaluations when graph execution ends.
@@ -1037,8 +1027,7 @@ class ExecutionProcessor:
)
while queued_execution := execution_queue.get_or_none():
update_node_execution_status(
db_client=db_client,
self.execution_data.update_node_status_and_publish(
exec_id=queued_execution.node_exec_id,
status=execution_status,
stats={"error": str(error)} if error else None,
@@ -1053,7 +1042,7 @@ class ExecutionProcessor:
node_id: str,
graph_exec: GraphExecutionEntry,
log_metadata: LogMetadata,
nodes_input_masks: Optional[NodesInputMasks],
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]],
execution_queue: ExecutionQueue[NodeExecutionEntry],
) -> None:
"""Process a node's output, update its status, and enqueue next nodes.
@@ -1066,12 +1055,10 @@ class ExecutionProcessor:
nodes_input_masks: Optional map of node input overrides
execution_queue: Queue to add next executions to
"""
db_client = get_db_async_client()
log_metadata.debug(f"Enqueue nodes for {node_id}: {output}")
for next_execution in await _enqueue_next_nodes(
db_client=db_client,
execution_data_client=self.execution_data,
node=output.node,
output=output.data,
user_id=graph_exec.user_id,
@@ -1085,15 +1072,13 @@ class ExecutionProcessor:
def _handle_agent_run_notif(
self,
db_client: "DatabaseManagerClient",
graph_exec: GraphExecutionEntry,
exec_stats: GraphExecutionStats,
):
metadata = db_client.get_graph_metadata(
metadata = self.execution_data.get_graph_metadata(
graph_exec.graph_id, graph_exec.graph_version
)
outputs = db_client.get_node_executions(
graph_exec.graph_exec_id,
outputs = self.execution_data.get_node_executions(
block_ids=[AgentOutputBlock().id],
)
@@ -1122,13 +1107,12 @@ class ExecutionProcessor:
def _handle_insufficient_funds_notif(
self,
db_client: "DatabaseManagerClient",
user_id: str,
graph_id: str,
e: InsufficientBalanceError,
):
shortfall = abs(e.amount) - e.balance
metadata = db_client.get_graph_metadata(graph_id)
metadata = self.execution_data.get_graph_metadata(graph_id)
base_url = (
settings.config.frontend_base_url or settings.config.platform_base_url
)
@@ -1147,7 +1131,7 @@ class ExecutionProcessor:
)
try:
user_email = db_client.get_user_email_by_id(user_id)
user_email = self.execution_data.get_user_email_by_id(user_id)
alert_message = (
f"❌ **Insufficient Funds Alert**\n"
@@ -1169,7 +1153,6 @@ class ExecutionProcessor:
def _handle_low_balance(
self,
db_client: "DatabaseManagerClient",
user_id: str,
current_balance: int,
transaction_cost: int,
@@ -1198,7 +1181,7 @@ class ExecutionProcessor:
)
try:
user_email = db_client.get_user_email_by_id(user_id)
user_email = self.execution_data.get_user_email_by_id(user_id)
alert_message = (
f"⚠️ **Low Balance Alert**\n"
f"User: {user_email or user_id}\n"
@@ -1576,117 +1559,3 @@ class ExecutionManager(AppProcess):
)
logger.info(f"{prefix} ✅ Finished GraphExec cleanup")
# ------- UTILITIES ------- #
def get_db_client() -> "DatabaseManagerClient":
return get_database_manager_client()
def get_db_async_client() -> "DatabaseManagerAsyncClient":
return get_database_manager_async_client()
@func_retry
async def send_async_execution_update(
entry: GraphExecution | NodeExecutionResult | None,
) -> None:
if entry is None:
return
await get_async_execution_event_bus().publish(entry)
@func_retry
def send_execution_update(entry: GraphExecution | NodeExecutionResult | None):
if entry is None:
return
return get_execution_event_bus().publish(entry)
async def async_update_node_execution_status(
db_client: "DatabaseManagerAsyncClient",
exec_id: str,
status: ExecutionStatus,
execution_data: BlockInput | None = None,
stats: dict[str, Any] | None = None,
) -> NodeExecutionResult:
"""Sets status and fetches+broadcasts the latest state of the node execution"""
exec_update = await db_client.update_node_execution_status(
exec_id, status, execution_data, stats
)
await send_async_execution_update(exec_update)
return exec_update
def update_node_execution_status(
db_client: "DatabaseManagerClient",
exec_id: str,
status: ExecutionStatus,
execution_data: BlockInput | None = None,
stats: dict[str, Any] | None = None,
) -> NodeExecutionResult:
"""Sets status and fetches+broadcasts the latest state of the node execution"""
exec_update = db_client.update_node_execution_status(
exec_id, status, execution_data, stats
)
send_execution_update(exec_update)
return exec_update
async def async_update_graph_execution_state(
db_client: "DatabaseManagerAsyncClient",
graph_exec_id: str,
status: ExecutionStatus | None = None,
stats: GraphExecutionStats | None = None,
) -> GraphExecution | None:
"""Sets status and fetches+broadcasts the latest state of the graph execution"""
graph_update = await db_client.update_graph_execution_stats(
graph_exec_id, status, stats
)
if graph_update:
await send_async_execution_update(graph_update)
else:
logger.error(f"Failed to update graph execution stats for {graph_exec_id}")
return graph_update
def update_graph_execution_state(
db_client: "DatabaseManagerClient",
graph_exec_id: str,
status: ExecutionStatus | None = None,
stats: GraphExecutionStats | None = None,
) -> GraphExecution | None:
"""Sets status and fetches+broadcasts the latest state of the graph execution"""
graph_update = db_client.update_graph_execution_stats(graph_exec_id, status, stats)
if graph_update:
send_execution_update(graph_update)
else:
logger.error(f"Failed to update graph execution stats for {graph_exec_id}")
return graph_update
@asynccontextmanager
async def synchronized(key: str, timeout: int = 60):
r = await redis.get_redis_async()
lock: RedisLock = r.lock(f"lock:{key}", timeout=timeout)
try:
await lock.acquire()
yield
finally:
if await lock.locked() and await lock.owned():
await lock.release()
def increment_execution_count(user_id: str) -> int:
"""
Increment the execution count for a given user,
this will be used to charge the user for the execution cost.
"""
r = redis.get_redis()
k = f"uec:{user_id}" # User Execution Count global key
counter = cast(int, r.incr(k))
if counter == 1:
r.expire(k, settings.config.execution_counter_expiration_time)
return counter

View File

@@ -32,13 +32,17 @@ async def test_handle_low_balance_threshold_crossing(server: SpinTestServer):
mock_settings.config.low_balance_threshold = 500 # $5 threshold
mock_settings.config.frontend_base_url = "https://test.com"
# Create mock database client
mock_db_client = MagicMock()
mock_db_client.get_user_email_by_id.return_value = "test@example.com"
# Initialize the execution processor and mock its execution_data
execution_processor.on_graph_executor_start()
# Mock the execution_data attribute since it's created in on_graph_execution
mock_execution_data = MagicMock()
execution_processor.execution_data = mock_execution_data
mock_execution_data.get_user_email_by_id.return_value = "test@example.com"
# Test the low balance handler
execution_processor._handle_low_balance(
db_client=mock_db_client,
user_id=user_id,
current_balance=current_balance,
transaction_cost=transaction_cost,
@@ -62,6 +66,19 @@ async def test_handle_low_balance_threshold_crossing(server: SpinTestServer):
assert "$4.00" in discord_message
assert "$6.00" in discord_message
# Cleanup execution processor threads
try:
execution_processor.node_execution_loop.call_soon_threadsafe(
execution_processor.node_execution_loop.stop
)
execution_processor.node_evaluation_loop.call_soon_threadsafe(
execution_processor.node_evaluation_loop.stop
)
execution_processor.node_execution_thread.join(timeout=1)
execution_processor.node_evaluation_thread.join(timeout=1)
except Exception:
pass # Ignore cleanup errors
@pytest.mark.asyncio(loop_scope="session")
async def test_handle_low_balance_no_notification_when_not_crossing(
@@ -90,12 +107,17 @@ async def test_handle_low_balance_no_notification_when_not_crossing(
mock_get_client.return_value = mock_client
mock_settings.config.low_balance_threshold = 500 # $5 threshold
# Create mock database client
mock_db_client = MagicMock()
# Initialize the execution processor and mock its execution_data
execution_processor.on_graph_executor_start()
# Mock the execution_data attribute since it's created in on_graph_execution
mock_execution_data = MagicMock()
execution_processor.execution_data = mock_execution_data
mock_execution_data.get_user_email_by_id.return_value = "test@example.com"
# Test the low balance handler
execution_processor._handle_low_balance(
db_client=mock_db_client,
user_id=user_id,
current_balance=current_balance,
transaction_cost=transaction_cost,
@@ -105,6 +127,19 @@ async def test_handle_low_balance_no_notification_when_not_crossing(
mock_queue_notif.assert_not_called()
mock_client.discord_system_alert.assert_not_called()
# Cleanup execution processor threads
try:
execution_processor.node_execution_loop.call_soon_threadsafe(
execution_processor.node_execution_loop.stop
)
execution_processor.node_evaluation_loop.call_soon_threadsafe(
execution_processor.node_evaluation_loop.stop
)
execution_processor.node_execution_thread.join(timeout=1)
execution_processor.node_evaluation_thread.join(timeout=1)
except Exception:
pass # Ignore cleanup errors
@pytest.mark.asyncio(loop_scope="session")
async def test_handle_low_balance_no_duplicate_when_already_below(
@@ -133,12 +168,17 @@ async def test_handle_low_balance_no_duplicate_when_already_below(
mock_get_client.return_value = mock_client
mock_settings.config.low_balance_threshold = 500 # $5 threshold
# Create mock database client
mock_db_client = MagicMock()
# Initialize the execution processor and mock its execution_data
execution_processor.on_graph_executor_start()
# Mock the execution_data attribute since it's created in on_graph_execution
mock_execution_data = MagicMock()
execution_processor.execution_data = mock_execution_data
mock_execution_data.get_user_email_by_id.return_value = "test@example.com"
# Test the low balance handler
execution_processor._handle_low_balance(
db_client=mock_db_client,
user_id=user_id,
current_balance=current_balance,
transaction_cost=transaction_cost,
@@ -147,3 +187,16 @@ async def test_handle_low_balance_no_duplicate_when_already_below(
# Verify no notification was sent (user was already below threshold)
mock_queue_notif.assert_not_called()
mock_client.discord_system_alert.assert_not_called()
# Cleanup execution processor threads
try:
execution_processor.node_execution_loop.call_soon_threadsafe(
execution_processor.node_execution_loop.stop
)
execution_processor.node_evaluation_loop.call_soon_threadsafe(
execution_processor.node_evaluation_loop.stop
)
execution_processor.node_execution_thread.join(timeout=1)
execution_processor.node_evaluation_thread.join(timeout=1)
except Exception:
pass # Ignore cleanup errors

View File

@@ -35,20 +35,21 @@ async def execute_graph(
logger.info(f"Input data: {input_data}")
# --- Test adding new executions --- #
graph_exec = await agent_server.test_execute_graph(
response = await agent_server.test_execute_graph(
user_id=test_user.id,
graph_id=test_graph.id,
graph_version=test_graph.version,
node_input=input_data,
)
logger.info(f"Created execution with ID: {graph_exec.id}")
graph_exec_id = response.graph_exec_id
logger.info(f"Created execution with ID: {graph_exec_id}")
# Execution queue should be empty
logger.info("Waiting for execution to complete...")
result = await wait_execution(test_user.id, graph_exec.id, 30)
result = await wait_execution(test_user.id, graph_exec_id, 30)
logger.info(f"Execution completed with {len(result)} results")
assert len(result) == num_execs
return graph_exec.id
return graph_exec_id
async def assert_sample_graph_executions(
@@ -378,7 +379,7 @@ async def test_execute_preset(server: SpinTestServer):
# Verify execution
assert result is not None
graph_exec_id = result.id
graph_exec_id = result["id"]
# Wait for execution to complete
executions = await wait_execution(test_user.id, graph_exec_id)
@@ -467,7 +468,7 @@ async def test_execute_preset_with_clash(server: SpinTestServer):
# Verify execution
assert result is not None, "Result must not be None"
graph_exec_id = result.id
graph_exec_id = result["id"]
# Wait for execution to complete
executions = await wait_execution(test_user.id, graph_exec_id)

View File

@@ -1,7 +1,6 @@
from typing import cast
import pytest
from pytest_mock import MockerFixture
from backend.executor.utils import merge_execution_input, parse_execution_output
from backend.util.mock import MockObject
@@ -277,142 +276,3 @@ def test_merge_execution_input():
result = merge_execution_input(data)
assert "mixed" in result
assert result["mixed"].attr[0]["key"] == "value3"
@pytest.mark.asyncio
async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
"""
Verify that calling the function with its own output creates the same execution again.
"""
from backend.data.execution import GraphExecutionWithNodes
from backend.data.model import CredentialsMetaInput
from backend.executor.utils import add_graph_execution
from backend.integrations.providers import ProviderName
# Mock data
graph_id = "test-graph-id"
user_id = "test-user-id"
inputs = {"test_input": "test_value"}
preset_id = "test-preset-id"
graph_version = 1
graph_credentials_inputs = {
"cred_key": CredentialsMetaInput(
id="cred-id", provider=ProviderName("test_provider"), type="oauth2"
)
}
nodes_input_masks = {"node1": {"input1": "masked_value"}}
# Mock the graph object returned by validate_and_construct_node_execution_input
mock_graph = mocker.MagicMock()
mock_graph.version = graph_version
# Mock the starting nodes input and compiled nodes input masks
starting_nodes_input = [
("node1", {"input1": "value1"}),
("node2", {"input1": "value2"}),
]
compiled_nodes_input_masks = {"node1": {"input1": "compiled_mask"}}
# Mock the graph execution object
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec.id = "execution-id-123"
mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock()
# Mock user context
mock_user_context = {"user_id": user_id, "context": "test_context"}
# Mock the queue and event bus
mock_queue = mocker.AsyncMock()
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
# Setup mocks
mock_validate = mocker.patch(
"backend.executor.utils.validate_and_construct_node_execution_input"
)
mock_edb = mocker.patch("backend.executor.utils.execution_db")
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_get_user_context = mocker.patch("backend.executor.utils.get_user_context")
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
# Setup mock returns
mock_validate.return_value = (
mock_graph,
starting_nodes_input,
compiled_nodes_input_masks,
)
mock_prisma.is_connected.return_value = True
mock_edb.create_graph_execution = mocker.AsyncMock(return_value=mock_graph_exec)
mock_get_user_context.return_value = mock_user_context
mock_get_queue.return_value = mock_queue
mock_get_event_bus.return_value = mock_event_bus
# Call the function - first execution
result1 = await add_graph_execution(
graph_id=graph_id,
user_id=user_id,
inputs=inputs,
preset_id=preset_id,
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
)
# Store the parameters used in the first call to create_graph_execution
first_call_kwargs = mock_edb.create_graph_execution.call_args[1]
# Verify the create_graph_execution was called with correct parameters
mock_edb.create_graph_execution.assert_called_once_with(
user_id=user_id,
graph_id=graph_id,
graph_version=mock_graph.version,
inputs=inputs,
credential_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
)
# Set up the graph execution mock to have properties we can extract
mock_graph_exec.graph_id = graph_id
mock_graph_exec.user_id = user_id
mock_graph_exec.graph_version = graph_version
mock_graph_exec.inputs = inputs
mock_graph_exec.credential_inputs = graph_credentials_inputs
mock_graph_exec.nodes_input_masks = nodes_input_masks
mock_graph_exec.preset_id = preset_id
# Create a second mock execution for the sanity check
mock_graph_exec_2 = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec_2.id = "execution-id-456"
mock_graph_exec_2.to_graph_execution_entry.return_value = mocker.MagicMock()
# Reset mocks and set up for second call
mock_edb.create_graph_execution.reset_mock()
mock_edb.create_graph_execution.return_value = mock_graph_exec_2
mock_validate.reset_mock()
# Sanity check: call add_graph_execution with properties from first result
# This should create the same execution parameters
result2 = await add_graph_execution(
graph_id=mock_graph_exec.graph_id,
user_id=mock_graph_exec.user_id,
inputs=mock_graph_exec.inputs,
preset_id=mock_graph_exec.preset_id,
graph_version=mock_graph_exec.graph_version,
graph_credentials_inputs=mock_graph_exec.credential_inputs,
nodes_input_masks=mock_graph_exec.nodes_input_masks,
)
# Verify that create_graph_execution was called with identical parameters
second_call_kwargs = mock_edb.create_graph_execution.call_args[1]
# The sanity check: both calls should use identical parameters
assert first_call_kwargs == second_call_kwargs
# Both executions should succeed (though they create different objects)
assert result1 == mock_graph_exec
assert result2 == mock_graph_exec_2

View File

@@ -4,13 +4,13 @@ import threading
import time
from collections import defaultdict
from concurrent.futures import Future
from typing import Any, Mapping, Optional, cast
from typing import Any, Optional
from pydantic import BaseModel, JsonValue, ValidationError
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data.block import Block, BlockInput, BlockOutputEntry, BlockType, get_block
from backend.data.block import Block, BlockData, BlockInput, BlockType, get_block
from backend.data.block_cost_config import BLOCK_COSTS
from backend.data.cost import BlockCostType
from backend.data.db import prisma
@@ -18,7 +18,6 @@ from backend.data.execution import (
ExecutionStatus,
GraphExecutionStats,
GraphExecutionWithNodes,
NodesInputMasks,
UserContext,
)
from backend.data.graph import GraphModel, Node
@@ -240,7 +239,7 @@ def _tokenise(path: str) -> list[tuple[str, str]] | None:
# --------------------------------------------------------------------------- #
def parse_execution_output(output: BlockOutputEntry, name: str) -> JsonValue | None:
def parse_execution_output(output: BlockData, name: str) -> Any | None:
"""
Retrieve a nested value out of `output` using the flattened *name*.
@@ -264,7 +263,7 @@ def parse_execution_output(output: BlockOutputEntry, name: str) -> JsonValue | N
if tokens is None:
return None
cur: JsonValue = data
cur: Any = data
for delim, ident in tokens:
if delim == LIST_SPLIT:
# list[index]
@@ -429,7 +428,7 @@ def validate_exec(
async def _validate_node_input_credentials(
graph: GraphModel,
user_id: str,
nodes_input_masks: Optional[NodesInputMasks] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> dict[str, dict[str, str]]:
"""
Checks all credentials for all nodes of the graph and returns structured errors.
@@ -509,8 +508,8 @@ async def _validate_node_input_credentials(
def make_node_credentials_input_map(
graph: GraphModel,
graph_credentials_input: Mapping[str, CredentialsMetaInput],
) -> NodesInputMasks:
graph_credentials_input: dict[str, CredentialsMetaInput],
) -> dict[str, dict[str, JsonValue]]:
"""
Maps credentials for an execution to the correct nodes.
@@ -545,8 +544,8 @@ def make_node_credentials_input_map(
async def validate_graph_with_credentials(
graph: GraphModel,
user_id: str,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> Mapping[str, Mapping[str, str]]:
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> dict[str, dict[str, str]]:
"""
Validate graph including credentials and return structured errors per node.
@@ -576,7 +575,7 @@ async def _construct_starting_node_execution_input(
graph: GraphModel,
user_id: str,
graph_inputs: BlockInput,
nodes_input_masks: Optional[NodesInputMasks] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> list[tuple[str, BlockInput]]:
"""
Validates and prepares the input data for executing a graph.
@@ -617,7 +616,7 @@ async def _construct_starting_node_execution_input(
# Extract request input data, and assign it to the input pin.
if block.block_type == BlockType.INPUT:
input_name = cast(str | None, node.input_default.get("name"))
input_name = node.input_default.get("name")
if input_name and input_name in graph_inputs:
input_data = {"value": graph_inputs[input_name]}
@@ -644,9 +643,9 @@ async def validate_and_construct_node_execution_input(
user_id: str,
graph_inputs: BlockInput,
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
) -> tuple[GraphModel, list[tuple[str, BlockInput]], NodesInputMasks]:
graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> tuple[GraphModel, list[tuple[str, BlockInput]], dict[str, dict[str, JsonValue]]]:
"""
Public wrapper that handles graph fetching, credential mapping, and validation+construction.
This centralizes the logic used by both scheduler validation and actual execution.
@@ -660,9 +659,7 @@ async def validate_and_construct_node_execution_input(
nodes_input_masks: Node inputs to use.
Returns:
GraphModel: Full graph object for the given `graph_id`.
list[tuple[node_id, BlockInput]]: Starting node IDs with corresponding inputs.
dict[str, BlockInput]: Node input masks including all passed-in credentials.
tuple[GraphModel, list[tuple[str, BlockInput]]]: Graph model and list of tuples for node execution input.
Raises:
NotFoundError: If the graph is not found.
@@ -703,11 +700,11 @@ async def validate_and_construct_node_execution_input(
def _merge_nodes_input_masks(
overrides_map_1: NodesInputMasks,
overrides_map_2: NodesInputMasks,
) -> NodesInputMasks:
overrides_map_1: dict[str, dict[str, JsonValue]],
overrides_map_2: dict[str, dict[str, JsonValue]],
) -> dict[str, dict[str, JsonValue]]:
"""Perform a per-node merge of input overrides"""
result = dict(overrides_map_1).copy()
result = overrides_map_1.copy()
for node_id, overrides2 in overrides_map_2.items():
if node_id in result:
result[node_id] = {**result[node_id], **overrides2}
@@ -857,8 +854,8 @@ async def add_graph_execution(
inputs: Optional[BlockInput] = None,
preset_id: Optional[str] = None,
graph_version: Optional[int] = None,
graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[NodesInputMasks] = None,
graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None,
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
) -> GraphExecutionWithNodes:
"""
Adds a graph execution to the queue and returns the execution entry.
@@ -882,7 +879,7 @@ async def add_graph_execution(
else:
edb = get_database_manager_async_client()
graph, starting_nodes_input, compiled_nodes_input_masks = (
graph, starting_nodes_input, nodes_input_masks = (
await validate_and_construct_node_execution_input(
graph_id=graph_id,
user_id=user_id,
@@ -895,15 +892,10 @@ async def add_graph_execution(
graph_exec = None
try:
# Sanity check: running add_graph_execution with the properties of
# the graph_exec created here should create the same execution again.
graph_exec = await edb.create_graph_execution(
user_id=user_id,
graph_id=graph_id,
graph_version=graph.version,
inputs=inputs or {},
credential_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
)
@@ -912,9 +904,9 @@ async def add_graph_execution(
user_context = await get_user_context(user_id)
queue = await get_async_execution_queue()
graph_exec_entry = graph_exec.to_graph_execution_entry(
user_context, compiled_nodes_input_masks
)
graph_exec_entry = graph_exec.to_graph_execution_entry(user_context)
if nodes_input_masks:
graph_exec_entry.nodes_input_masks = nodes_input_masks
logger.info(
f"Created graph execution #{graph_exec.id} for graph "
@@ -960,7 +952,7 @@ async def add_graph_execution(
class ExecutionOutputEntry(BaseModel):
node: Node
node_exec_id: str
data: BlockOutputEntry
data: BlockData
class NodeExecutionProgress:

View File

@@ -88,7 +88,6 @@ async def test_send_graph_execution_result(
user_id="user-1",
graph_id="test_graph",
graph_version=1,
preset_id=None,
status=ExecutionStatus.COMPLETED,
started_at=datetime.now(tz=timezone.utc),
ended_at=datetime.now(tz=timezone.utc),
@@ -102,8 +101,6 @@ async def test_send_graph_execution_result(
"input_1": "some input value :)",
"input_2": "some *other* input value",
},
credential_inputs=None,
nodes_input_masks=None,
outputs={
"the_output": ["some output value"],
"other_output": ["sike there was another output"],

View File

@@ -1,14 +1,16 @@
from fastapi import HTTPException, Security
from fastapi import Depends, HTTPException, Request
from fastapi.security import APIKeyHeader
from prisma.enums import APIKeyPermission
from backend.data.api_key import APIKey, has_permission, validate_api_key
from backend.data.api_key import has_permission, validate_api_key
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_header = APIKeyHeader(name="X-API-Key")
async def require_api_key(api_key: str | None = Security(api_key_header)) -> APIKey:
async def require_api_key(request: Request):
"""Base middleware for API key authentication"""
api_key = await api_key_header(request)
if api_key is None:
raise HTTPException(status_code=401, detail="Missing API key")
@@ -17,17 +19,18 @@ async def require_api_key(api_key: str | None = Security(api_key_header)) -> API
if not api_key_obj:
raise HTTPException(status_code=401, detail="Invalid API key")
request.state.api_key = api_key_obj
return api_key_obj
def require_permission(permission: APIKeyPermission):
"""Dependency function for checking specific permissions"""
async def check_permission(api_key: APIKey = Security(require_api_key)):
async def check_permission(api_key=Depends(require_api_key)):
if not has_permission(api_key, permission):
raise HTTPException(
status_code=403,
detail=f"API key lacks the required permission '{permission}'",
detail=f"API key missing required permission: {permission}",
)
return api_key

View File

@@ -2,7 +2,7 @@ import logging
from collections import defaultdict
from typing import Annotated, Any, Optional, Sequence
from fastapi import APIRouter, Body, HTTPException, Security
from fastapi import APIRouter, Body, Depends, HTTPException
from prisma.enums import AgentExecutionStatus, APIKeyPermission
from typing_extensions import TypedDict
@@ -47,7 +47,7 @@ class GraphExecutionResult(TypedDict):
@v1_router.get(
path="/blocks",
tags=["blocks"],
dependencies=[Security(require_permission(APIKeyPermission.READ_BLOCK))],
dependencies=[Depends(require_permission(APIKeyPermission.READ_BLOCK))],
)
def get_graph_blocks() -> Sequence[dict[Any, Any]]:
blocks = [block() for block in backend.data.block.get_blocks().values()]
@@ -57,12 +57,12 @@ def get_graph_blocks() -> Sequence[dict[Any, Any]]:
@v1_router.post(
path="/blocks/{block_id}/execute",
tags=["blocks"],
dependencies=[Security(require_permission(APIKeyPermission.EXECUTE_BLOCK))],
dependencies=[Depends(require_permission(APIKeyPermission.EXECUTE_BLOCK))],
)
async def execute_graph_block(
block_id: str,
data: BlockInput,
api_key: APIKey = Security(require_permission(APIKeyPermission.EXECUTE_BLOCK)),
api_key: APIKey = Depends(require_permission(APIKeyPermission.EXECUTE_BLOCK)),
) -> CompletedBlockOutput:
obj = backend.data.block.get_block(block_id)
if not obj:
@@ -82,7 +82,7 @@ async def execute_graph(
graph_id: str,
graph_version: int,
node_input: Annotated[dict[str, Any], Body(..., embed=True, default_factory=dict)],
api_key: APIKey = Security(require_permission(APIKeyPermission.EXECUTE_GRAPH)),
api_key: APIKey = Depends(require_permission(APIKeyPermission.EXECUTE_GRAPH)),
) -> dict[str, Any]:
try:
graph_exec = await add_graph_execution(
@@ -104,7 +104,7 @@ async def execute_graph(
async def get_graph_execution_results(
graph_id: str,
graph_exec_id: str,
api_key: APIKey = Security(require_permission(APIKeyPermission.READ_GRAPH)),
api_key: APIKey = Depends(require_permission(APIKeyPermission.READ_GRAPH)),
) -> GraphExecutionResult:
graph = await graph_db.get_graph(graph_id, user_id=api_key.user_id)
if not graph:

View File

@@ -34,6 +34,10 @@ class WSSubscribeGraphExecutionsRequest(pydantic.BaseModel):
graph_id: str
class ExecuteGraphResponse(pydantic.BaseModel):
graph_exec_id: str
class CreateGraph(pydantic.BaseModel):
graph: Graph

View File

@@ -363,7 +363,6 @@ class AgentServer(backend.util.service.AppProcess):
preset_id=preset_id,
user_id=user_id,
inputs=inputs or {},
credential_inputs={},
)
@staticmethod

View File

@@ -78,6 +78,7 @@ from backend.server.model import (
CreateAPIKeyRequest,
CreateAPIKeyResponse,
CreateGraph,
ExecuteGraphResponse,
RequestTopUp,
SetGraphActiveVersion,
TimezoneResponse,
@@ -782,7 +783,7 @@ async def execute_graph(
],
graph_version: Optional[int] = None,
preset_id: Optional[str] = None,
) -> execution_db.GraphExecutionMeta:
) -> ExecuteGraphResponse:
current_balance = await _user_credit_model.get_credits(user_id)
if current_balance <= 0:
raise HTTPException(
@@ -791,7 +792,7 @@ async def execute_graph(
)
try:
return await execution_utils.add_graph_execution(
graph_exec = await execution_utils.add_graph_execution(
graph_id=graph_id,
user_id=user_id,
inputs=inputs,
@@ -799,6 +800,7 @@ async def execute_graph(
graph_version=graph_version,
graph_credentials_inputs=credentials_inputs,
)
return ExecuteGraphResponse(graph_exec_id=graph_exec.id)
except GraphValidationError as e:
# Return structured validation errors that the frontend can parse
raise HTTPException(

View File

@@ -147,8 +147,10 @@ class AutoModManager:
return None
# Get completed executions and collect outputs
completed_executions = await db_client.get_node_executions(
graph_exec_id, statuses=[ExecutionStatus.COMPLETED], include_exec_data=True
completed_executions = await db_client.get_node_executions( # type: ignore
graph_exec_id=graph_exec_id,
statuses=[ExecutionStatus.COMPLETED],
include_exec_data=True,
)
if not completed_executions:
@@ -218,7 +220,7 @@ class AutoModManager:
):
"""Update node execution statuses for frontend display when moderation fails"""
# Import here to avoid circular imports
from backend.executor.manager import send_async_execution_update
from backend.util.clients import get_async_execution_event_bus
if moderation_type == "input":
# For input moderation, mark queued/running/incomplete nodes as failed
@@ -232,8 +234,10 @@ class AutoModManager:
target_statuses = [ExecutionStatus.COMPLETED]
# Get the executions that need to be updated
executions_to_update = await db_client.get_node_executions(
graph_exec_id, statuses=target_statuses, include_exec_data=True
executions_to_update = await db_client.get_node_executions( # type: ignore
graph_exec_id=graph_exec_id,
statuses=target_statuses,
include_exec_data=True,
)
if not executions_to_update:
@@ -276,10 +280,12 @@ class AutoModManager:
updated_execs = await asyncio.gather(*exec_updates)
# Send all websocket updates in parallel
event_bus = get_async_execution_event_bus()
await asyncio.gather(
*[
send_async_execution_update(updated_exec)
event_bus.publish(updated_exec)
for updated_exec in updated_execs
if updated_exec is not None
]
)

View File

@@ -16,7 +16,7 @@ import backend.server.v2.store.media as store_media
from backend.data.block import BlockInput
from backend.data.db import transaction
from backend.data.execution import get_graph_execution
from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include
from backend.data.includes import library_agent_include
from backend.data.model import CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks.graph_lifecycle_hooks import on_graph_activate
@@ -617,7 +617,7 @@ async def list_presets(
where=query_filter,
skip=(page - 1) * page_size,
take=page_size,
include=AGENT_PRESET_INCLUDE,
include={"InputPresets": True},
)
total_items = await prisma.models.AgentPreset.prisma().count(where=query_filter)
total_pages = (total_items + page_size - 1) // page_size
@@ -662,7 +662,7 @@ async def get_preset(
try:
preset = await prisma.models.AgentPreset.prisma().find_unique(
where={"id": preset_id},
include=AGENT_PRESET_INCLUDE,
include={"InputPresets": True},
)
if not preset or preset.userId != user_id or preset.isDeleted:
return None
@@ -717,7 +717,7 @@ async def create_preset(
]
},
),
include=AGENT_PRESET_INCLUDE,
include={"InputPresets": True},
)
return library_model.LibraryAgentPreset.from_db(new_preset)
except prisma.errors.PrismaError as e:
@@ -747,25 +747,6 @@ async def create_preset_from_graph_execution(
if not graph_execution:
raise NotFoundError(f"Graph execution #{graph_exec_id} not found")
# Sanity check: credential inputs must be available if required for this preset
if graph_execution.credential_inputs is None:
graph = await graph_db.get_graph(
graph_id=graph_execution.graph_id,
version=graph_execution.graph_version,
user_id=graph_execution.user_id,
include_subgraphs=True,
)
if not graph:
raise NotFoundError(
f"Graph #{graph_execution.graph_id} not found or accessible"
)
elif len(graph.aggregate_credentials_inputs()) > 0:
raise ValueError(
f"Graph execution #{graph_exec_id} can't be turned into a preset "
"because it was run before this feature existed "
"and so the input credentials were not saved."
)
logger.debug(
f"Creating preset for user #{user_id} from graph execution #{graph_exec_id}",
)
@@ -773,7 +754,7 @@ async def create_preset_from_graph_execution(
user_id=user_id,
preset=library_model.LibraryAgentPresetCreatable(
inputs=graph_execution.inputs,
credentials=graph_execution.credential_inputs or {},
credentials={}, # FIXME
graph_id=graph_execution.graph_id,
graph_version=graph_execution.graph_version,
name=create_request.name,
@@ -853,7 +834,7 @@ async def update_preset(
updated = await prisma.models.AgentPreset.prisma(tx).update(
where={"id": preset_id},
data=update_data,
include=AGENT_PRESET_INCLUDE,
include={"InputPresets": True},
)
if not updated:
raise RuntimeError(f"AgentPreset #{preset_id} vanished while updating")
@@ -868,7 +849,7 @@ async def set_preset_webhook(
) -> library_model.LibraryAgentPreset:
current = await prisma.models.AgentPreset.prisma().find_unique(
where={"id": preset_id},
include=AGENT_PRESET_INCLUDE,
include={"InputPresets": True},
)
if not current or current.userId != user_id:
raise NotFoundError(f"Preset #{preset_id} not found")
@@ -880,7 +861,7 @@ async def set_preset_webhook(
if webhook_id
else {"Webhook": {"disconnect": True}}
),
include=AGENT_PRESET_INCLUDE,
include={"InputPresets": True},
)
if not updated:
raise RuntimeError(f"AgentPreset #{preset_id} vanished while updating")

View File

@@ -1,6 +1,6 @@
import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional
from typing import Any, Optional
import prisma.enums
import prisma.models
@@ -9,11 +9,9 @@ import pydantic
import backend.data.block as block_model
import backend.data.graph as graph_model
from backend.data.model import CredentialsMetaInput, is_credentials_field_name
from backend.integrations.providers import ProviderName
from backend.util.models import Pagination
if TYPE_CHECKING:
from backend.data.integrations import Webhook
class LibraryAgentStatus(str, Enum):
COMPLETED = "COMPLETED" # All runs completed
@@ -22,6 +20,14 @@ class LibraryAgentStatus(str, Enum):
ERROR = "ERROR" # Agent is in an error state
class LibraryAgentTriggerInfo(pydantic.BaseModel):
provider: ProviderName
config_schema: dict[str, Any] = pydantic.Field(
description="Input schema for the trigger block"
)
credentials_input_name: Optional[str]
class LibraryAgent(pydantic.BaseModel):
"""
Represents an agent in the library, including metadata for display and
@@ -53,7 +59,7 @@ class LibraryAgent(pydantic.BaseModel):
has_external_trigger: bool = pydantic.Field(
description="Whether the agent has an external trigger (e.g. webhook) node"
)
trigger_setup_info: Optional[graph_model.GraphTriggerInfo] = None
trigger_setup_info: Optional[LibraryAgentTriggerInfo] = None
# Indicates whether there's a new output (based on recent runs)
new_output: bool
@@ -126,7 +132,30 @@ class LibraryAgent(pydantic.BaseModel):
graph.credentials_input_schema if sub_graphs is not None else None
),
has_external_trigger=graph.has_external_trigger,
trigger_setup_info=graph.trigger_setup_info,
trigger_setup_info=(
LibraryAgentTriggerInfo(
provider=trigger_block.webhook_config.provider,
config_schema={
**(json_schema := trigger_block.input_schema.jsonschema()),
"properties": {
pn: sub_schema
for pn, sub_schema in json_schema["properties"].items()
if not is_credentials_field_name(pn)
},
"required": [
pn
for pn in json_schema.get("required", [])
if not is_credentials_field_name(pn)
],
},
credentials_input_name=next(
iter(trigger_block.input_schema.get_credentials_fields()), None
),
)
if graph.webhook_input_node
and (trigger_block := graph.webhook_input_node.block).webhook_config
else None
),
new_output=new_output,
can_access_graph=can_access_graph,
is_latest_version=is_latest_version,
@@ -255,18 +284,10 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable):
user_id: str
updated_at: datetime.datetime
webhook: "Webhook | None"
@classmethod
def from_db(cls, preset: prisma.models.AgentPreset) -> "LibraryAgentPreset":
from backend.data.integrations import Webhook
if preset.InputPresets is None:
raise ValueError("InputPresets must be included in AgentPreset query")
if preset.webhookId and preset.Webhook is None:
raise ValueError(
"Webhook must be included in AgentPreset query when webhookId is set"
)
input_data: block_model.BlockInput = {}
input_credentials: dict[str, CredentialsMetaInput] = {}
@@ -291,7 +312,6 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable):
inputs=input_data,
credentials=input_credentials,
webhook_id=preset.webhookId,
webhook=Webhook.from_db(preset.Webhook) if preset.Webhook else None,
)

View File

@@ -6,10 +6,8 @@ from fastapi import APIRouter, Body, HTTPException, Query, Security, status
import backend.server.v2.library.db as db
import backend.server.v2.library.model as models
from backend.data.execution import GraphExecutionMeta
from backend.data.graph import get_graph
from backend.data.integrations import get_webhook
from backend.data.model import CredentialsMetaInput
from backend.executor.utils import add_graph_execution, make_node_credentials_input_map
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks import get_webhook_manager
@@ -371,41 +369,48 @@ async def execute_preset(
preset_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
inputs: dict[str, Any] = Body(..., embed=True, default_factory=dict),
credential_inputs: dict[str, CredentialsMetaInput] = Body(
..., embed=True, default_factory=dict
),
) -> GraphExecutionMeta:
) -> dict[str, Any]: # FIXME: add proper return type
"""
Execute a preset given graph parameters, returning the execution ID on success.
Args:
preset_id: ID of the preset to execute.
user_id: ID of the authenticated user.
inputs: Optionally, inputs to override the preset for execution.
credential_inputs: Optionally, credentials to override the preset for execution.
preset_id (str): ID of the preset to execute.
user_id (str): ID of the authenticated user.
inputs (dict[str, Any]): Optionally, additional input data for the graph execution.
Returns:
GraphExecutionMeta: Object representing the created execution.
{id: graph_exec_id}: A response containing the execution ID.
Raises:
HTTPException: If the preset is not found or an error occurs while executing the preset.
"""
preset = await db.get_preset(user_id, preset_id)
if not preset:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Preset #{preset_id} not found",
try:
preset = await db.get_preset(user_id, preset_id)
if not preset:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Preset #{preset_id} not found",
)
# Merge input overrides with preset inputs
merged_node_input = preset.inputs | inputs
execution = await add_graph_execution(
user_id=user_id,
graph_id=preset.graph_id,
graph_version=preset.graph_version,
preset_id=preset_id,
inputs=merged_node_input,
)
# Merge input overrides with preset inputs
merged_node_input = preset.inputs | inputs
merged_credential_inputs = preset.credentials | credential_inputs
logger.debug(f"Execution added: {execution} with input: {merged_node_input}")
return await add_graph_execution(
user_id=user_id,
graph_id=preset.graph_id,
graph_version=preset.graph_version,
preset_id=preset_id,
inputs=merged_node_input,
graph_credentials_inputs=merged_credential_inputs,
)
return {"id": execution.id}
except HTTPException:
raise
except Exception as e:
logger.exception("Preset execution failed for user %s: %s", user_id, e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)

View File

@@ -1,4 +1,3 @@
import asyncio
import logging
from datetime import datetime, timezone
@@ -10,7 +9,6 @@ import prisma.types
import backend.server.v2.store.exceptions
import backend.server.v2.store.model
from backend.data.db import transaction
from backend.data.graph import (
GraphMeta,
GraphModel,
@@ -72,7 +70,7 @@ async def get_store_agents(
)
sanitized_query = sanitize_query(search_query)
where_clause: prisma.types.StoreAgentWhereInput = {"is_available": True}
where_clause = {}
if featured:
where_clause["featured"] = featured
if creators:
@@ -96,13 +94,15 @@ async def get_store_agents(
try:
agents = await prisma.models.StoreAgent.prisma().find_many(
where=where_clause,
where=prisma.types.StoreAgentWhereInput(**where_clause),
order=order_by,
skip=(page - 1) * page_size,
take=page_size,
)
total = await prisma.models.StoreAgent.prisma().count(where=where_clause)
total = await prisma.models.StoreAgent.prisma().count(
where=prisma.types.StoreAgentWhereInput(**where_clause)
)
total_pages = (total + page_size - 1) // page_size
store_agents: list[backend.server.v2.store.model.StoreAgent] = []
@@ -190,8 +190,8 @@ async def get_store_agent_details(
agent_name=agent.agent_name,
agent_video=agent.agent_video or "",
agent_image=agent.agent_image,
creator=agent.creator_username or "",
creator_avatar=agent.creator_avatar or "",
creator=agent.creator_username,
creator_avatar=agent.creator_avatar,
sub_heading=agent.sub_heading,
description=agent.description,
categories=agent.categories,
@@ -263,8 +263,8 @@ async def get_store_agent_by_version_id(
agent_name=agent.agent_name,
agent_video=agent.agent_video or "",
agent_image=agent.agent_image,
creator=agent.creator_username or "",
creator_avatar=agent.creator_avatar or "",
creator=agent.creator_username,
creator_avatar=agent.creator_avatar,
sub_heading=agent.sub_heading,
description=agent.description,
categories=agent.categories,
@@ -1200,103 +1200,40 @@ async def get_agent(store_listing_version_id: str) -> GraphModel:
#####################################################
async def _approve_sub_agent(
tx,
sub_graph: prisma.models.AgentGraph,
main_agent_name: str,
main_agent_version: int,
main_agent_user_id: str,
) -> None:
"""Approve a single sub-agent by creating/updating store listings as needed"""
heading = f"Sub-agent of {main_agent_name} v{main_agent_version}"
async def _get_missing_sub_store_listing(
graph: prisma.models.AgentGraph,
) -> list[prisma.models.AgentGraph]:
"""
Agent graph can have sub-graphs, and those sub-graphs also need to be store listed.
This method fetches the sub-graphs, and returns the ones not listed in the store.
"""
sub_graphs = await get_sub_graphs(graph)
if not sub_graphs:
return []
# Find existing listing for this sub-agent
listing = await prisma.models.StoreListing.prisma(tx).find_first(
where={"agentGraphId": sub_graph.id, "isDeleted": False},
include={"Versions": True},
)
# Early return: Create new listing if none exists
if not listing:
await prisma.models.StoreListing.prisma(tx).create(
data=prisma.types.StoreListingCreateInput(
slug=f"sub-agent-{sub_graph.id[:8]}",
agentGraphId=sub_graph.id,
agentGraphVersion=sub_graph.version,
owningUserId=main_agent_user_id,
hasApprovedVersion=True,
Versions={
"create": [
_create_sub_agent_version_data(
sub_graph, heading, main_agent_name
)
]
},
)
)
return
# Find version matching this sub-graph
matching_version = next(
(
v
for v in listing.Versions or []
if v.agentGraphId == sub_graph.id
and v.agentGraphVersion == sub_graph.version
),
None,
)
# Early return: Approve existing version if found and not already approved
if matching_version:
if matching_version.submissionStatus == prisma.enums.SubmissionStatus.APPROVED:
return # Already approved, nothing to do
await prisma.models.StoreListingVersion.prisma(tx).update(
where={"id": matching_version.id},
data={
# Fetch all the sub-graphs that are listed, and return the ones missing.
store_listed_sub_graphs = {
(listing.agentGraphId, listing.agentGraphVersion)
for listing in await prisma.models.StoreListingVersion.prisma().find_many(
where={
"OR": [
{
"agentGraphId": sub_graph.id,
"agentGraphVersion": sub_graph.version,
}
for sub_graph in sub_graphs
],
"submissionStatus": prisma.enums.SubmissionStatus.APPROVED,
"reviewedAt": datetime.now(tz=timezone.utc),
},
"isDeleted": False,
}
)
await prisma.models.StoreListing.prisma(tx).update(
where={"id": listing.id}, data={"hasApprovedVersion": True}
)
return
}
# Create new version if no matching version found
next_version = max((v.version for v in listing.Versions or []), default=0) + 1
await prisma.models.StoreListingVersion.prisma(tx).create(
data={
**_create_sub_agent_version_data(sub_graph, heading, main_agent_name),
"version": next_version,
"storeListingId": listing.id,
}
)
await prisma.models.StoreListing.prisma(tx).update(
where={"id": listing.id}, data={"hasApprovedVersion": True}
)
def _create_sub_agent_version_data(
sub_graph: prisma.models.AgentGraph, heading: str, main_agent_name: str
) -> prisma.types.StoreListingVersionCreateInput:
"""Create store listing version data for a sub-agent"""
return prisma.types.StoreListingVersionCreateInput(
agentGraphId=sub_graph.id,
agentGraphVersion=sub_graph.version,
name=sub_graph.name or heading,
submissionStatus=prisma.enums.SubmissionStatus.APPROVED,
subHeading=heading,
description=(
f"{heading}: {sub_graph.description}" if sub_graph.description else heading
),
changesSummary=f"Auto-approved as sub-agent of {main_agent_name}",
isAvailable=False,
submittedAt=datetime.now(tz=timezone.utc),
imageUrls=[], # Sub-agents don't need images
categories=[], # Sub-agents don't need categories
)
return [
sub_graph
for sub_graph in sub_graphs
if (sub_graph.id, sub_graph.version) not in store_listed_sub_graphs
]
async def review_store_submission(
@@ -1334,30 +1271,33 @@ async def review_store_submission(
# If approving, update the listing to indicate it has an approved version
if is_approved and store_listing_version.AgentGraph:
async with transaction() as tx:
# Handle sub-agent approvals in transaction
await asyncio.gather(
*[
_approve_sub_agent(
tx,
sub_graph,
store_listing_version.name,
store_listing_version.agentGraphVersion,
store_listing_version.StoreListing.owningUserId,
)
for sub_graph in await get_sub_graphs(
store_listing_version.AgentGraph
)
]
)
heading = f"Sub-graph of {store_listing_version.name}v{store_listing_version.agentGraphVersion}"
await prisma.models.StoreListing.prisma(tx).update(
where={"id": store_listing_version.StoreListing.id},
data={
"hasApprovedVersion": True,
"ActiveVersion": {"connect": {"id": store_listing_version_id}},
},
sub_store_listing_versions = [
prisma.types.StoreListingVersionCreateWithoutRelationsInput(
agentGraphId=sub_graph.id,
agentGraphVersion=sub_graph.version,
name=sub_graph.name or heading,
submissionStatus=prisma.enums.SubmissionStatus.APPROVED,
subHeading=heading,
description=f"{heading}: {sub_graph.description}",
changesSummary=f"This listing is added as a {heading} / #{store_listing_version.agentGraphId}.",
isAvailable=False, # Hide sub-graphs from the store by default.
submittedAt=datetime.now(tz=timezone.utc),
)
for sub_graph in await _get_missing_sub_store_listing(
store_listing_version.AgentGraph
)
]
await prisma.models.StoreListing.prisma().update(
where={"id": store_listing_version.StoreListing.id},
data={
"hasApprovedVersion": True,
"ActiveVersion": {"connect": {"id": store_listing_version_id}},
"Versions": {"create": sub_store_listing_versions},
},
)
# If rejecting an approved agent, update the StoreListing accordingly
if is_rejecting_approved:

View File

@@ -41,7 +41,6 @@ async def test_get_store_agents(mocker):
rating=4.5,
versions=["1.0"],
updated_at=datetime.now(),
is_available=False,
)
]
@@ -83,7 +82,6 @@ async def test_get_store_agent_details(mocker):
rating=4.5,
versions=["1.0"],
updated_at=datetime.now(),
is_available=False,
)
# Create a mock StoreListing result

View File

@@ -251,14 +251,14 @@ async def block_autogen_agent():
test_user = await create_test_user()
test_graph = await create_graph(create_test_graph(), user_id=test_user.id)
input_data = {"input": "Write me a block that writes a string into a file."}
graph_exec = await server.agent_server.test_execute_graph(
response = await server.agent_server.test_execute_graph(
graph_id=test_graph.id,
user_id=test_user.id,
node_input=input_data,
)
print(graph_exec)
print(response)
result = await wait_execution(
graph_exec_id=graph_exec.id,
graph_exec_id=response.graph_exec_id,
timeout=1200,
user_id=test_user.id,
)

View File

@@ -155,13 +155,13 @@ async def reddit_marketing_agent():
test_user = await create_test_user()
test_graph = await create_graph(create_test_graph(), user_id=test_user.id)
input_data = {"subreddit": "AutoGPT"}
graph_exec = await server.agent_server.test_execute_graph(
response = await server.agent_server.test_execute_graph(
graph_id=test_graph.id,
user_id=test_user.id,
node_input=input_data,
)
print(graph_exec)
result = await wait_execution(test_user.id, graph_exec.id, 120)
print(response)
result = await wait_execution(test_user.id, response.graph_exec_id, 120)
print(result)

View File

@@ -88,12 +88,12 @@ async def sample_agent():
test_user = await create_test_user()
test_graph = await create_graph(create_test_graph(), test_user.id)
input_data = {"input_1": "Hello", "input_2": "World"}
graph_exec = await server.agent_server.test_execute_graph(
response = await server.agent_server.test_execute_graph(
graph_id=test_graph.id,
user_id=test_user.id,
node_input=input_data,
)
await wait_execution(test_user.id, graph_exec.id, 10)
await wait_execution(test_user.id, response.graph_exec_id, 10)
if __name__ == "__main__":

View File

@@ -1,6 +1,3 @@
from typing import Mapping
class MissingConfigError(Exception):
"""The attempted operation requires configuration which is not available"""
@@ -72,7 +69,7 @@ class GraphValidationError(ValueError):
"""Structured validation error for graph validation failures"""
def __init__(
self, message: str, node_errors: Mapping[str, Mapping[str, str]] | None = None
self, message: str, node_errors: dict[str, dict[str, str]] | None = None
):
super().__init__(message)
self.message = message

View File

@@ -1,5 +0,0 @@
-- Add 'credentialInputs', 'inputs', and 'nodesInputMasks' columns to the AgentGraphExecution table
ALTER TABLE "AgentGraphExecution"
ADD COLUMN "credentialInputs" JSONB,
ADD COLUMN "inputs" JSONB,
ADD COLUMN "nodesInputMasks" JSONB;

View File

@@ -1,53 +0,0 @@
-- Update StoreAgent view to include is_available field and fix creator field nullability
BEGIN;
-- Drop and recreate the StoreAgent view with isAvailable field
DROP VIEW IF EXISTS "StoreAgent";
CREATE OR REPLACE VIEW "StoreAgent" AS
WITH agent_versions AS (
SELECT
"storeListingId",
array_agg(DISTINCT version::text ORDER BY version::text) AS versions
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
)
SELECT
sl.id AS listing_id,
slv.id AS "storeListingVersionId",
slv."createdAt" AS updated_at,
sl.slug,
COALESCE(slv.name, '') AS agent_name,
slv."videoUrl" AS agent_video,
COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image,
slv."isFeatured" AS featured,
p.username AS creator_username, -- Allow NULL for malformed sub-agents
p."avatarUrl" AS creator_avatar, -- Allow NULL for malformed sub-agents
slv."subHeading" AS sub_heading,
slv.description,
slv.categories,
COALESCE(ar.run_count, 0::bigint) AS runs,
COALESCE(rs.avg_rating, 0.0)::double precision AS rating,
COALESCE(av.versions, ARRAY[slv.version::text]) AS versions,
slv."isAvailable" AS is_available -- Add isAvailable field to filter sub-agents
FROM "StoreListing" sl
INNER JOIN "StoreListingVersion" slv
ON slv."storeListingId" = sl.id
AND slv."submissionStatus" = 'APPROVED'
JOIN "AgentGraph" a
ON slv."agentGraphId" = a.id
AND slv."agentGraphVersion" = a.version
LEFT JOIN "Profile" p
ON sl."owningUserId" = p."userId"
LEFT JOIN "mv_review_stats" rs
ON sl.id = rs."storeListingId"
LEFT JOIN "mv_agent_run_counts" ar
ON a.id = ar."agentGraphId"
LEFT JOIN agent_versions av
ON sl.id = av."storeListingId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true;
COMMIT;

View File

@@ -898,62 +898,52 @@ pytz = ">2021.1"
[[package]]
name = "cryptography"
version = "45.0.7"
version = "43.0.3"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
optional = false
python-versions = "!=3.9.0,!=3.9.1,>=3.7"
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "cryptography-45.0.7-cp311-abi3-macosx_10_9_universal2.whl", hash = "sha256:3be4f21c6245930688bd9e162829480de027f8bf962ede33d4f8ba7d67a00cee"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:67285f8a611b0ebc0857ced2081e30302909f571a46bfa7a3cc0ad303fe015c6"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:577470e39e60a6cd7780793202e63536026d9b8641de011ed9d8174da9ca5339"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:4bd3e5c4b9682bc112d634f2c6ccc6736ed3635fc3319ac2bb11d768cc5a00d8"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:465ccac9d70115cd4de7186e60cfe989de73f7bb23e8a7aa45af18f7412e75bf"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:16ede8a4f7929b4b7ff3642eba2bf79aa1d71f24ab6ee443935c0d269b6bc513"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:8978132287a9d3ad6b54fcd1e08548033cc09dc6aacacb6c004c73c3eb5d3ac3"},
{file = "cryptography-45.0.7-cp311-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:b6a0e535baec27b528cb07a119f321ac024592388c5681a5ced167ae98e9fff3"},
{file = "cryptography-45.0.7-cp311-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:a24ee598d10befaec178efdff6054bc4d7e883f615bfbcd08126a0f4931c83a6"},
{file = "cryptography-45.0.7-cp311-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:fa26fa54c0a9384c27fcdc905a2fb7d60ac6e47d14bc2692145f2b3b1e2cfdbd"},
{file = "cryptography-45.0.7-cp311-abi3-win32.whl", hash = "sha256:bef32a5e327bd8e5af915d3416ffefdbe65ed975b646b3805be81b23580b57b8"},
{file = "cryptography-45.0.7-cp311-abi3-win_amd64.whl", hash = "sha256:3808e6b2e5f0b46d981c24d79648e5c25c35e59902ea4391a0dcb3e667bf7443"},
{file = "cryptography-45.0.7-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:bfb4c801f65dd61cedfc61a83732327fafbac55a47282e6f26f073ca7a41c3b2"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:81823935e2f8d476707e85a78a405953a03ef7b7b4f55f93f7c2d9680e5e0691"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:3994c809c17fc570c2af12c9b840d7cea85a9fd3e5c0e0491f4fa3c029216d59"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:dad43797959a74103cb59c5dac71409f9c27d34c8a05921341fb64ea8ccb1dd4"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:ce7a453385e4c4693985b4a4a3533e041558851eae061a58a5405363b098fcd3"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:b04f85ac3a90c227b6e5890acb0edbaf3140938dbecf07bff618bf3638578cf1"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:48c41a44ef8b8c2e80ca4527ee81daa4c527df3ecbc9423c41a420a9559d0e27"},
{file = "cryptography-45.0.7-cp37-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:f3df7b3d0f91b88b2106031fd995802a2e9ae13e02c36c1fc075b43f420f3a17"},
{file = "cryptography-45.0.7-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:dd342f085542f6eb894ca00ef70236ea46070c8a13824c6bde0dfdcd36065b9b"},
{file = "cryptography-45.0.7-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:1993a1bb7e4eccfb922b6cd414f072e08ff5816702a0bdb8941c247a6b1b287c"},
{file = "cryptography-45.0.7-cp37-abi3-win32.whl", hash = "sha256:18fcf70f243fe07252dcb1b268a687f2358025ce32f9f88028ca5c364b123ef5"},
{file = "cryptography-45.0.7-cp37-abi3-win_amd64.whl", hash = "sha256:7285a89df4900ed3bfaad5679b1e668cb4b38a8de1ccbfc84b05f34512da0a90"},
{file = "cryptography-45.0.7-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:de58755d723e86175756f463f2f0bddd45cc36fbd62601228a3f8761c9f58252"},
{file = "cryptography-45.0.7-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a20e442e917889d1a6b3c570c9e3fa2fdc398c20868abcea268ea33c024c4083"},
{file = "cryptography-45.0.7-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:258e0dff86d1d891169b5af222d362468a9570e2532923088658aa866eb11130"},
{file = "cryptography-45.0.7-pp310-pypy310_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:d97cf502abe2ab9eff8bd5e4aca274da8d06dd3ef08b759a8d6143f4ad65d4b4"},
{file = "cryptography-45.0.7-pp310-pypy310_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:c987dad82e8c65ebc985f5dae5e74a3beda9d0a2a4daf8a1115f3772b59e5141"},
{file = "cryptography-45.0.7-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:c13b1e3afd29a5b3b2656257f14669ca8fa8d7956d509926f0b130b600b50ab7"},
{file = "cryptography-45.0.7-pp311-pypy311_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4a862753b36620af6fc54209264f92c716367f2f0ff4624952276a6bbd18cbde"},
{file = "cryptography-45.0.7-pp311-pypy311_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:06ce84dc14df0bf6ea84666f958e6080cdb6fe1231be2a51f3fc1267d9f3fb34"},
{file = "cryptography-45.0.7-pp311-pypy311_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:d0c5c6bac22b177bf8da7435d9d27a6834ee130309749d162b26c3105c0795a9"},
{file = "cryptography-45.0.7-pp311-pypy311_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:2f641b64acc00811da98df63df7d59fd4706c0df449da71cb7ac39a0732b40ae"},
{file = "cryptography-45.0.7-pp311-pypy311_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:f5414a788ecc6ee6bc58560e85ca624258a55ca434884445440a810796ea0e0b"},
{file = "cryptography-45.0.7-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:1f3d56f73595376f4244646dd5c5870c14c196949807be39e79e7bd9bac3da63"},
{file = "cryptography-45.0.7.tar.gz", hash = "sha256:4b1654dfc64ea479c242508eb8c724044f1e964a47d1d1cacc5132292d851971"},
{file = "cryptography-43.0.3-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:bf7a1932ac4176486eab36a19ed4c0492da5d97123f1406cf15e41b05e787d2e"},
{file = "cryptography-43.0.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:63efa177ff54aec6e1c0aefaa1a241232dcd37413835a9b674b6e3f0ae2bfd3e"},
{file = "cryptography-43.0.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e1ce50266f4f70bf41a2c6dc4358afadae90e2a1e5342d3c08883df1675374f"},
{file = "cryptography-43.0.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:443c4a81bb10daed9a8f334365fe52542771f25aedaf889fd323a853ce7377d6"},
{file = "cryptography-43.0.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:74f57f24754fe349223792466a709f8e0c093205ff0dca557af51072ff47ab18"},
{file = "cryptography-43.0.3-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:9762ea51a8fc2a88b70cf2995e5675b38d93bf36bd67d91721c309df184f49bd"},
{file = "cryptography-43.0.3-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:81ef806b1fef6b06dcebad789f988d3b37ccaee225695cf3e07648eee0fc6b73"},
{file = "cryptography-43.0.3-cp37-abi3-win32.whl", hash = "sha256:cbeb489927bd7af4aa98d4b261af9a5bc025bd87f0e3547e11584be9e9427be2"},
{file = "cryptography-43.0.3-cp37-abi3-win_amd64.whl", hash = "sha256:f46304d6f0c6ab8e52770addfa2fc41e6629495548862279641972b6215451cd"},
{file = "cryptography-43.0.3-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:8ac43ae87929a5982f5948ceda07001ee5e83227fd69cf55b109144938d96984"},
{file = "cryptography-43.0.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:846da004a5804145a5f441b8530b4bf35afbf7da70f82409f151695b127213d5"},
{file = "cryptography-43.0.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0f996e7268af62598f2fc1204afa98a3b5712313a55c4c9d434aef49cadc91d4"},
{file = "cryptography-43.0.3-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:f7b178f11ed3664fd0e995a47ed2b5ff0a12d893e41dd0494f406d1cf555cab7"},
{file = "cryptography-43.0.3-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:c2e6fc39c4ab499049df3bdf567f768a723a5e8464816e8f009f121a5a9f4405"},
{file = "cryptography-43.0.3-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:e1be4655c7ef6e1bbe6b5d0403526601323420bcf414598955968c9ef3eb7d16"},
{file = "cryptography-43.0.3-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:df6b6c6d742395dd77a23ea3728ab62f98379eff8fb61be2744d4679ab678f73"},
{file = "cryptography-43.0.3-cp39-abi3-win32.whl", hash = "sha256:d56e96520b1020449bbace2b78b603442e7e378a9b3bd68de65c782db1507995"},
{file = "cryptography-43.0.3-cp39-abi3-win_amd64.whl", hash = "sha256:0c580952eef9bf68c4747774cde7ec1d85a6e61de97281f2dba83c7d2c806362"},
{file = "cryptography-43.0.3-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:d03b5621a135bffecad2c73e9f4deb1a0f977b9a8ffe6f8e002bf6c9d07b918c"},
{file = "cryptography-43.0.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a2a431ee15799d6db9fe80c82b055bae5a752bef645bba795e8e52687c69efe3"},
{file = "cryptography-43.0.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:281c945d0e28c92ca5e5930664c1cefd85efe80e5c0d2bc58dd63383fda29f83"},
{file = "cryptography-43.0.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:f18c716be16bc1fea8e95def49edf46b82fccaa88587a45f8dc0ff6ab5d8e0a7"},
{file = "cryptography-43.0.3-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4a02ded6cd4f0a5562a8887df8b3bd14e822a90f97ac5e544c162899bc467664"},
{file = "cryptography-43.0.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:53a583b6637ab4c4e3591a15bc9db855b8d9dee9a669b550f311480acab6eb08"},
{file = "cryptography-43.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:1ec0bcf7e17c0c5669d881b1cd38c4972fade441b27bda1051665faaa89bdcaa"},
{file = "cryptography-43.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:2ce6fae5bdad59577b44e4dfed356944fbf1d925269114c28be377692643b4ff"},
{file = "cryptography-43.0.3.tar.gz", hash = "sha256:315b9001266a492a6ff443b61238f956b214dbec9910a081ba5b6646a055a805"},
]
[package.dependencies]
cffi = {version = ">=1.14", markers = "platform_python_implementation != \"PyPy\""}
cffi = {version = ">=1.12", markers = "platform_python_implementation != \"PyPy\""}
[package.extras]
docs = ["sphinx (>=5.3.0)", "sphinx-inline-tabs ; python_full_version >= \"3.8.0\"", "sphinx-rtd-theme (>=3.0.0) ; python_full_version >= \"3.8.0\""]
docstest = ["pyenchant (>=3)", "readme-renderer (>=30.0)", "sphinxcontrib-spelling (>=7.3.1)"]
nox = ["nox (>=2024.4.15)", "nox[uv] (>=2024.3.2) ; python_full_version >= \"3.8.0\""]
pep8test = ["check-sdist ; python_full_version >= \"3.8.0\"", "click (>=8.0.1)", "mypy (>=1.4)", "ruff (>=0.3.6)"]
sdist = ["build (>=1.0.0)"]
docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=1.1.1)"]
docstest = ["pyenchant (>=1.6.11)", "readme-renderer", "sphinxcontrib-spelling (>=4.0.1)"]
nox = ["nox"]
pep8test = ["check-sdist", "click", "mypy", "ruff"]
sdist = ["build"]
ssh = ["bcrypt (>=3.1.5)"]
test = ["certifi (>=2024)", "cryptography-vectors (==45.0.7)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"]
test = ["certifi", "cryptography-vectors (==43.0.3)", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
test-randomorder = ["pytest-randomly"]
[[package]]
@@ -7142,4 +7132,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.14"
content-hash = "80d4dc2cbcd1ae33b2fa3920db5dcb1f82ad252d1e4a8bfeba8b2f2eebbdda0d"
content-hash = "892daa57d7126d9a9d5308005b07328a39b8c4cd7fe198f9b5ab10f957787c48"

View File

@@ -17,7 +17,7 @@ apscheduler = "^3.11.0"
autogpt-libs = { path = "../autogpt_libs", develop = true }
bleach = { extras = ["css"], version = "^6.2.0" }
click = "^8.2.0"
cryptography = "^45.0"
cryptography = "^43.0"
discord-py = "^2.5.2"
e2b-code-interpreter = "^1.5.2"
fastapi = "^0.116.1"

View File

@@ -36,7 +36,7 @@ model User {
notifyOnAgentApproved Boolean @default(true)
notifyOnAgentRejected Boolean @default(true)
timezone String @default("not-set")
timezone String @default("not-set")
// Relations
@@ -354,20 +354,15 @@ model AgentGraphExecution {
agentGraphVersion Int @default(1)
AgentGraph AgentGraph @relation(fields: [agentGraphId, agentGraphVersion], references: [id, version], onDelete: Cascade)
agentPresetId String?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
inputs Json?
credentialInputs Json?
nodesInputMasks Json?
NodeExecutions AgentNodeExecution[]
// Link to User model -- Executed by this user
userId String
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
stats Json?
stats Json?
agentPresetId String?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
@@index([agentGraphId, agentGraphVersion])
@@index([userId])
@@ -630,15 +625,14 @@ view StoreAgent {
agent_image String[]
featured Boolean @default(false)
creator_username String?
creator_avatar String?
creator_username String
creator_avatar String
sub_heading String
description String
categories String[]
runs Int
rating Float
versions String[]
is_available Boolean @default(true)
// Materialized views used (refreshed every 15 minutes via pg_cron):
// - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId

View File

@@ -24,7 +24,6 @@
"type": "object"
},
"sub_graphs": [],
"trigger_setup_info": null,
"user_id": "test-user-id",
"version": 1
}

View File

@@ -23,7 +23,6 @@
"type": "object"
},
"sub_graphs": [],
"trigger_setup_info": null,
"user_id": "test-user-id",
"version": 1
}

View File

@@ -25,10 +25,83 @@ x-supabase-env-files: &supabase-env-files
# Common Supabase environment - hardcoded defaults to avoid variable substitution
x-supabase-env: &supabase-env
SUPABASE_ANON_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJhbm9uIiwKICAgICJpc3MiOiAic3VwYWJhc2UtZGVtbyIsCiAgICAiaWF0IjogMTY0MTc2OTIwMCwKICAgICJleHAiOiAxNzk5NTM1NjAwCn0.dc_X5iR_VP_qT0zsiyj_I_OZ2T9FtRU2BBNWN8Bu4GE
SUPABASE_SERVICE_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q
# Core PostgreSQL settings
POSTGRES_PASSWORD: your-super-secret-and-long-postgres-password
POSTGRES_HOST: db
POSTGRES_PORT: "5432"
POSTGRES_DB: postgres
# Authentication & Security
JWT_SECRET: your-super-secret-jwt-token-with-at-least-32-characters-long
ANON_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJhbm9uIiwKICAgICJpc3MiOiAic3VwYWJhc2UtZGVtbyIsCiAgICAiaWF0IjogMTY0MTc2OTIwMCwKICAgICJleHAiOiAxNzk5NTM1NjAwCn0.dc_X5iR_VP_qT0zsiyj_I_OZ2T9FtRU2BBNWN8Bu4GE
SERVICE_ROLE_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q
DASHBOARD_USERNAME: supabase
DASHBOARD_PASSWORD: this_password_is_insecure_and_should_be_updated
SECRET_KEY_BASE: UpNVntn3cDxHJpq99YMc1T1AQgQpc8kfYTuRgBiYa15BLrx8etQoXz3gZv1/u2oq
VAULT_ENC_KEY: your-encryption-key-32-chars-min
# URLs and Endpoints
SITE_URL: http://localhost:3000
API_EXTERNAL_URL: http://localhost:8000
SUPABASE_PUBLIC_URL: http://localhost:8000
ADDITIONAL_REDIRECT_URLS: ""
# Feature Flags
DISABLE_SIGNUP: "false"
ENABLE_EMAIL_SIGNUP: "true"
ENABLE_EMAIL_AUTOCONFIRM: "false"
ENABLE_ANONYMOUS_USERS: "false"
ENABLE_PHONE_SIGNUP: "true"
ENABLE_PHONE_AUTOCONFIRM: "true"
FUNCTIONS_VERIFY_JWT: "false"
IMGPROXY_ENABLE_WEBP_DETECTION: "true"
# Email/SMTP Configuration
SMTP_ADMIN_EMAIL: admin@example.com
SMTP_HOST: supabase-mail
SMTP_PORT: "2500"
SMTP_USER: fake_mail_user
SMTP_PASS: fake_mail_password
SMTP_SENDER_NAME: fake_sender
# Mailer URLs
MAILER_URLPATHS_CONFIRMATION: /auth/v1/verify
MAILER_URLPATHS_INVITE: /auth/v1/verify
MAILER_URLPATHS_RECOVERY: /auth/v1/verify
MAILER_URLPATHS_EMAIL_CHANGE: /auth/v1/verify
# JWT Settings
JWT_EXPIRY: "3600"
# Database Schemas
PGRST_DB_SCHEMAS: public,storage,graphql_public
# Studio Settings
STUDIO_DEFAULT_ORGANIZATION: Default Organization
STUDIO_DEFAULT_PROJECT: Default Project
# Logging
LOGFLARE_API_KEY: your-super-secret-and-long-logflare-key
# Pooler Settings
POOLER_DEFAULT_POOL_SIZE: "20"
POOLER_MAX_CLIENT_CONN: "100"
POOLER_TENANT_ID: your-tenant-id
POOLER_PROXY_PORT_TRANSACTION: "6543"
# Kong Ports
KONG_HTTP_PORT: "8000"
KONG_HTTPS_PORT: "8443"
# Docker
DOCKER_SOCKET_LOCATION: /var/run/docker.sock
# Google Cloud (if needed)
GOOGLE_PROJECT_ID: GOOGLE_PROJECT_ID
GOOGLE_PROJECT_NUMBER: GOOGLE_PROJECT_NUMBER
services:
studio:
container_name: supabase-studio
image: supabase/studio:20250224-d10db0f
@@ -49,9 +122,16 @@ services:
<<: *supabase-env
# Keep any existing environment variables specific to that service
STUDIO_PG_META_URL: http://meta:8080
POSTGRES_PASSWORD: your-super-secret-and-long-postgres-password
DEFAULT_ORGANIZATION_NAME: Default Organization
DEFAULT_PROJECT_NAME: Default Project
OPENAI_API_KEY: ""
SUPABASE_URL: http://kong:8000
SUPABASE_PUBLIC_URL: http://localhost:8000
SUPABASE_ANON_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJhbm9uIiwKICAgICJpc3MiOiAic3VwYWJhc2UtZGVtbyIsCiAgICAiaWF0IjogMTY0MTc2OTIwMCwKICAgICJleHAiOiAxNzk5NTM1NjAwCn0.dc_X5iR_VP_qT0zsiyj_I_OZ2T9FtRU2BBNWN8Bu4GE
SUPABASE_SERVICE_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q
AUTH_JWT_SECRET: your-super-secret-jwt-token-with-at-least-32-characters-long
LOGFLARE_API_KEY: your-super-secret-and-long-logflare-key
@@ -83,6 +163,10 @@ services:
KONG_PLUGINS: request-transformer,cors,key-auth,acl,basic-auth
KONG_NGINX_PROXY_PROXY_BUFFER_SIZE: 160k
KONG_NGINX_PROXY_PROXY_BUFFERS: 64 160k
SUPABASE_ANON_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJhbm9uIiwKICAgICJpc3MiOiAic3VwYWJhc2UtZGVtbyIsCiAgICAiaWF0IjogMTY0MTc2OTIwMCwKICAgICJleHAiOiAxNzk5NTM1NjAwCn0.dc_X5iR_VP_qT0zsiyj_I_OZ2T9FtRU2BBNWN8Bu4GE
SUPABASE_SERVICE_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q
DASHBOARD_USERNAME: supabase
DASHBOARD_PASSWORD: this_password_is_insecure_and_should_be_updated
# https://unix.stackexchange.com/a/294837
entrypoint: bash -c 'eval "echo \"$$(cat ~/temp.yml)\"" > ~/kong.yml && /docker-entrypoint.sh kong docker-start'
@@ -113,6 +197,7 @@ services:
# Keep any existing environment variables specific to that service
GOTRUE_API_HOST: 0.0.0.0
GOTRUE_API_PORT: 9999
API_EXTERNAL_URL: http://localhost:8000
GOTRUE_DB_DRIVER: postgres
GOTRUE_DB_DATABASE_URL: postgres://supabase_auth_admin:your-super-secret-and-long-postgres-password@db:5432/postgres
@@ -340,7 +425,9 @@ services:
environment:
<<: *supabase-env
# Keep any existing environment variables specific to that service
JWT_SECRET: your-super-secret-jwt-token-with-at-least-32-characters-long
SUPABASE_URL: http://kong:8000
SUPABASE_ANON_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJhbm9uIiwKICAgICJpc3MiOiAic3VwYWJhc2UtZGVtbyIsCiAgICAiaWF0IjogMTY0MTc2OTIwMCwKICAgICJleHAiOiAxNzk5NTM1NjAwCn0.dc_X5iR_VP_qT0zsiyj_I_OZ2T9FtRU2BBNWN8Bu4GE
SUPABASE_SERVICE_ROLE_KEY: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q
SUPABASE_DB_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres
# TODO: Allow configuring VERIFY_JWT per function. This PR might help: https://github.com/supabase/cli/pull/786
@@ -444,8 +531,12 @@ services:
# Keep any existing environment variables specific to that service
POSTGRES_HOST: /var/run/postgresql
PGPORT: 5432
POSTGRES_PORT: 5432
PGPASSWORD: your-super-secret-and-long-postgres-password
POSTGRES_PASSWORD: your-super-secret-and-long-postgres-password
PGDATABASE: postgres
POSTGRES_DB: postgres
JWT_SECRET: your-super-secret-jwt-token-with-at-least-32-characters-long
JWT_EXP: 3600
command:
[
@@ -479,6 +570,8 @@ services:
<<: *supabase-env-files
environment:
<<: *supabase-env
# Keep any existing environment variables specific to that service
LOGFLARE_API_KEY: your-super-secret-and-long-logflare-key
command:
[
"--config",
@@ -519,6 +612,9 @@ services:
<<: *supabase-env
# Keep any existing environment variables specific to that service
PORT: 4000
POSTGRES_PORT: 5432
POSTGRES_DB: postgres
POSTGRES_PASSWORD: your-super-secret-and-long-postgres-password
DATABASE_URL: ecto://supabase_admin:your-super-secret-and-long-postgres-password@db:5432/_supabase
CLUSTER_POSTGRES: true
SECRET_KEY_BASE: UpNVntn3cDxHJpq99YMc1T1AQgQpc8kfYTuRgBiYa15BLrx8etQoXz3gZv1/u2oq

View File

@@ -6,7 +6,8 @@
# 5. CLI arguments - docker compose run -e VAR=value
# Common backend environment - Docker service names
x-backend-env: &backend-env # Docker internal service hostnames (override localhost defaults)
x-backend-env:
&backend-env # Docker internal service hostnames (override localhost defaults)
PYRO_HOST: "0.0.0.0"
AGENTSERVER_HOST: rest_server
SCHEDULER_HOST: scheduler_server
@@ -19,10 +20,6 @@ x-backend-env: &backend-env # Docker internal service hostnames (override localh
RABBITMQ_HOST: rabbitmq
# Override Supabase URL for Docker network
SUPABASE_URL: http://kong:8000
# Database connection string for Docker network
# This cannot be constructed like in .env because we cannot interpolate values set here (DB_HOST)
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
# Common env_file configuration for backend services
x-backend-env-files: &backend-env-files
@@ -46,18 +43,14 @@ services:
depends_on:
db:
condition: service_healthy
<<: *backend-env-files
environment:
<<: *backend-env
- DATABASE_URL=postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
- DIRECT_URL=postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
networks:
- app-network
restart: on-failure
healthcheck:
test:
[
"CMD-SHELL",
"poetry run prisma migrate status | grep -q 'No pending migrations' || exit 1",
]
test: ["CMD-SHELL", "poetry run prisma migrate status | grep -q 'No pending migrations' || exit 1"]
interval: 30s
timeout: 10s
retries: 3
@@ -84,9 +77,9 @@ services:
timeout: 10s
retries: 5
start_period: 10s
<<: *backend-env-files
environment:
<<: *backend-env
- RABBITMQ_DEFAULT_USER=rabbitmq_user_default
- RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7 # CHANGE THIS TO A RANDOM PASSWORD IN PRODUCTION -- everywhere lol
ports:
- "5672:5672"
- "15672:15672"
@@ -114,6 +107,9 @@ services:
<<: *backend-env-files
environment:
<<: *backend-env
# Service-specific overrides
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
ports:
- "8006:8006"
networks:
@@ -144,6 +140,9 @@ services:
<<: *backend-env-files
environment:
<<: *backend-env
# Service-specific overrides
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
ports:
- "8002:8002"
networks:
@@ -172,6 +171,9 @@ services:
<<: *backend-env-files
environment:
<<: *backend-env
# Service-specific overrides
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
ports:
- "8001:8001"
networks:
@@ -196,6 +198,9 @@ services:
<<: *backend-env-files
environment:
<<: *backend-env
# Service-specific overrides
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
ports:
- "8005:8005"
networks:
@@ -239,6 +244,9 @@ services:
<<: *backend-env-files
environment:
<<: *backend-env
# Service-specific overrides
DATABASE_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
DIRECT_URL: postgresql://postgres:your-super-secret-and-long-postgres-password@db:5432/postgres?connect_timeout=60&schema=platform
ports:
- "8003:8003"
networks:
@@ -276,8 +284,6 @@ services:
context: ../
dockerfile: autogpt_platform/frontend/Dockerfile
target: prod
args:
NEXT_PUBLIC_PW_TEST: ${NEXT_PUBLIC_PW_TEST:-false}
depends_on:
db:
condition: service_healthy

View File

@@ -16,4 +16,5 @@
NEXT_PUBLIC_REACT_QUERY_DEVTOOL=true
NEXT_PUBLIC_GA_MEASUREMENT_ID=G-FH2XK2W4GN
NEXT_PUBLIC_PW_TEST=true

View File

@@ -9,17 +9,13 @@ RUN --mount=type=cache,target=/root/.local/share/pnpm pnpm install --frozen-lock
FROM base AS build
COPY autogpt_platform/frontend/ .
# Allow CI to opt-in to Playwright test build-time flags
ARG NEXT_PUBLIC_PW_TEST="false"
ENV NEXT_PUBLIC_PW_TEST=$NEXT_PUBLIC_PW_TEST
RUN if [ -f .env ]; then \
cat .env.default .env > .env.merged && mv .env.merged .env; \
else \
cp .env.default .env; \
fi
RUN pnpm run generate:api
# In CI, we want NEXT_PUBLIC_PW_TEST=true during build so Next.js inlines it
RUN if [ "$NEXT_PUBLIC_PW_TEST" = "true" ]; then NEXT_PUBLIC_PW_TEST=true pnpm build; else pnpm build; fi
RUN pnpm build
# Prod stage - based on NextJS reference Dockerfile https://github.com/vercel/next.js/blob/64271354533ed16da51be5dce85f0dbd15f17517/examples/with-docker/Dockerfile
FROM node:21-alpine AS prod

View File

@@ -10,8 +10,8 @@
"lint": "next lint && prettier --check .",
"format": "next lint --fix; prettier --write .",
"types": "tsc --noEmit",
"test": "NEXT_PUBLIC_PW_TEST=true next build --turbo && playwright test",
"test-ui": "NEXT_PUBLIC_PW_TEST=true next build --turbo && playwright test --ui",
"test": "next build --turbo && playwright test",
"test-ui": "next build --turbo && playwright test --ui",
"test:no-build": "playwright test",
"gentests": "playwright codegen http://localhost:3000",
"storybook": "storybook dev -p 6006",
@@ -66,14 +66,12 @@
"embla-carousel-react": "8.6.0",
"framer-motion": "12.23.12",
"geist": "1.4.2",
"highlight.js": "11.11.1",
"jaro-winkler": "0.2.8",
"katex": "0.16.22",
"launchdarkly-react-client-sdk": "3.8.1",
"lodash": "4.17.21",
"lucide-react": "0.539.0",
"moment": "2.30.1",
"next": "15.4.7",
"next": "15.4.6",
"next-themes": "0.4.6",
"nuqs": "2.4.3",
"party-js": "2.2.0",
@@ -87,17 +85,10 @@
"react-modal": "3.16.3",
"react-shepherd": "6.1.9",
"react-window": "1.8.11",
"recharts": "3.1.2",
"rehype-autolink-headings": "7.1.0",
"rehype-highlight": "7.0.2",
"rehype-katex": "7.0.1",
"rehype-slug": "6.0.0",
"remark-gfm": "4.0.1",
"remark-math": "6.0.0",
"recharts": "2.15.3",
"shepherd.js": "14.5.1",
"sonner": "2.0.7",
"tailwind-merge": "2.6.0",
"tailwind-scrollbar": "4.0.2",
"tailwindcss-animate": "1.0.7",
"uuid": "11.1.0",
"vaul": "1.1.2",

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,6 @@ import { getAgptServerBaseUrl } from "@/lib/env-config";
import { execSync } from "child_process";
import * as path from "path";
import * as fs from "fs";
import * as os from "os";
function fetchOpenApiSpec(): void {
const args = process.argv.slice(2);
@@ -43,30 +42,15 @@ function fetchOpenApiSpec(): void {
console.log(`Fetching OpenAPI spec from: ${openApiUrl}`);
// Write to a temporary file first to avoid clearing the real file on failure
const tmpOutputPath = path.join(
os.tmpdir(),
`openapi-fetch-${Date.now()}.json`,
);
try {
// Fetch the OpenAPI spec to a temp file
execSync(`curl "${openApiUrl}" -o "${tmpOutputPath}"`, {
stdio: "inherit",
});
// Fetch the OpenAPI spec
execSync(`curl "${openApiUrl}" > "${outputPath}"`, { stdio: "inherit" });
// Format with prettier
execSync(`prettier --write "${tmpOutputPath}"`, { stdio: "inherit" });
// Move temp file to final output path
fs.copyFileSync(tmpOutputPath, outputPath);
fs.unlinkSync(tmpOutputPath);
execSync(`prettier --write "${outputPath}"`, { stdio: "inherit" });
console.log("✅ OpenAPI spec fetched and formatted successfully");
} catch (error) {
if (fs.existsSync(tmpOutputPath)) {
fs.unlinkSync(tmpOutputPath);
}
console.error("❌ Failed to fetch OpenAPI spec:", error);
process.exit(1);
}

View File

@@ -9,6 +9,7 @@ import {
import { OnboardingText } from "@/components/onboarding/OnboardingText";
import StarRating from "@/components/onboarding/StarRating";
import SchemaTooltip from "@/components/SchemaTooltip";
import { TypeBasedInput } from "@/components/type-based-input";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { GraphMeta, StoreAgentDetails } from "@/lib/autogpt-server-api";
@@ -17,7 +18,6 @@ import { cn } from "@/lib/utils";
import { Play } from "lucide-react";
import { useRouter } from "next/navigation";
import { useCallback, useEffect, useState } from "react";
import { RunAgentInputs } from "@/app/(platform)/library/agents/[id]/components/AgentRunsView/components/RunAgentInputs/RunAgentInputs";
export default function Page() {
const { state, updateState, setStep } = useOnboarding(
@@ -89,13 +89,13 @@ export default function Page() {
const libraryAgent = await api.addMarketplaceAgentToLibrary(
storeAgent?.store_listing_version_id || "",
);
const { id: runID } = await api.executeGraph(
const { graph_exec_id } = await api.executeGraph(
libraryAgent.graph_id,
libraryAgent.graph_version,
state?.agentInput || {},
);
updateState({
onboardingAgentExecutionId: runID,
onboardingAgentExecutionId: graph_exec_id,
agentRuns: (state?.agentRuns || 0) + 1,
});
router.push("/onboarding/6-congrats");
@@ -233,7 +233,7 @@ export default function Page() {
description={inputSubSchema.description}
/>
</label>
<RunAgentInputs
<TypeBasedInput
schema={inputSubSchema}
value={state?.agentInput?.[key]}
placeholder={inputSubSchema.description}

View File

@@ -1,4 +1,4 @@
import { OAuthPopupResultMessage } from "@/app/(platform)/library/agents/[id]/components/AgentRunsView/components/CredentialsInputs/CredentialsInputs";
import { OAuthPopupResultMessage } from "@/components/integrations/credentials-input";
import { NextResponse } from "next/server";
// This route is intended to be used as the callback for integration OAuth flows,

View File

@@ -8,13 +8,12 @@ import { ToyBrick } from "lucide-react";
import { BlockMenuContent } from "../BlockMenuContent/BlockMenuContent";
import { ControlPanelButton } from "../ControlPanelButton";
import { useBlockMenu } from "./useBlockMenu";
import { BlockMenuStateProvider } from "../block-menu-provider";
interface BlockMenuProps {
pinBlocksPopover: boolean;
blockMenuSelected: "save" | "block" | "search" | "";
blockMenuSelected: "save" | "block" | "";
setBlockMenuSelected: React.Dispatch<
React.SetStateAction<"" | "save" | "block" | "search">
React.SetStateAction<"" | "save" | "block">
>;
}
@@ -45,10 +44,7 @@ export const BlockMenu: React.FC<BlockMenuProps> = ({
className="absolute h-[75vh] w-[46.625rem] overflow-hidden rounded-[1rem] border-none p-0 shadow-[0_2px_6px_0_rgba(0,0,0,0.05)]"
data-id="blocks-control-popover-content"
>
<BlockMenuStateProvider>
<BlockMenuContent />
</BlockMenuStateProvider>
<BlockMenuContent />
</PopoverContent>
</Popover>
);

View File

@@ -3,7 +3,7 @@ import { useState } from "react";
interface useBlockMenuProps {
pinBlocksPopover: boolean;
setBlockMenuSelected: React.Dispatch<
React.SetStateAction<"" | "save" | "block" | "search">
React.SetStateAction<"" | "save" | "block">
>;
}

View File

@@ -1,18 +1,10 @@
"use client";
import React from "react";
import { useBlockMenuContext } from "../block-menu-provider";
import { BlockMenuSearchBar } from "../BlockMenuSearchBar/BlockMenuSearchBar";
import { Separator } from "@/components/ui/separator";
import { BlockMenuDefault } from "../BlockMenuDefault/BlockMenuDefault";
import { BlockMenuSearch } from "../BlockMenuSearch/BlockMenuSearch";
export const BlockMenuContent = () => {
const { searchQuery } = useBlockMenuContext();
return (
<div className="flex h-full w-full flex-col">
<BlockMenuSearchBar />
<Separator className="h-[1px] w-full text-zinc-300" />
{searchQuery ? <BlockMenuSearch /> : <BlockMenuDefault />}
<div className="flex h-full w-full flex-col items-center justify-center">
This is the block menu content
</div>
);
};

View File

@@ -1,15 +0,0 @@
import React from "react";
import { Separator } from "@/components/ui/separator";
import { BlockMenuDefaultContent } from "../BlockMenuDefaultContent/BlockMenuDefaultContent";
import { BlockMenuSidebar } from "../BlockMenuSidebar/BlockMenuSidebar";
export const BlockMenuDefault = () => {
return (
<div className="flex flex-1 overflow-y-auto">
<BlockMenuSidebar />
<Separator className="h-full w-[1px] text-zinc-300" />
<BlockMenuDefaultContent />
</div>
);
};

View File

@@ -1,14 +0,0 @@
import { Text } from "@/components/atoms/Text/Text";
import React from "react";
export const BlockMenuDefaultContent = () => {
return (
<div className="h-full flex-1 overflow-hidden flex items-center justify-center">
{/* I have added temporary content here, will fillup it in follow up prs */}
<Text variant="body" className="text-green-300">
This is the block menu default content
</Text>
</div>
);
};

View File

@@ -1,12 +0,0 @@
import { Text } from "@/components/atoms/Text/Text";
export const BlockMenuSearch = () => {
return (
// This is just a temporary text, will content inside in it [in follow-up prs]
<div className="flex items-center justify-center h-full w-full">
<Text variant="h3" className="text-green-300">
This is the block menu search
</Text>
</div>
);
};

View File

@@ -1,53 +0,0 @@
import { cn } from "@/lib/utils";
import React from "react";
import { Input } from "@/components/ui/input";
import { useBlockMenuSearchBar } from "./useBlockMenuSearchBar";
import { Button } from "@/components/ui/button";
import { MagnifyingGlassIcon, XIcon } from "@phosphor-icons/react";
interface BlockMenuSearchBarProps {
className?: string;
}
export const BlockMenuSearchBar: React.FC<BlockMenuSearchBarProps> = ({
className = "",
}) => {
const { handleClear, inputRef, localQuery, setLocalQuery, debouncedSetSearchQuery } = useBlockMenuSearchBar();
return (
<div
className={cn(
"flex min-h-[3.5625rem] items-center gap-2.5 px-4",
className,
)}
>
<div className="flex h-6 w-6 items-center justify-center">
<MagnifyingGlassIcon className="h-6 w-6 text-zinc-700" strokeWidth={2} />
</div>
<Input
ref={inputRef}
type="text"
value={localQuery}
onChange={(e) => {
setLocalQuery(e.target.value);
debouncedSetSearchQuery(e.target.value);
}}
placeholder={"Blocks, Agents, Integrations or Keywords..."}
className={cn(
"m-0 border-none p-0 font-sans text-base font-normal text-zinc-800 shadow-none outline-none",
"placeholder:text-zinc-400 focus:shadow-none focus:outline-none focus:ring-0",
)}
/>
{localQuery.length > 0 && (
<Button
variant="ghost"
size={"sm"}
onClick={handleClear}
className="p-0 hover:bg-transparent"
>
<XIcon className="h-6 w-6 text-zinc-700" strokeWidth={2} />
</Button>
)}
</div>
);
};

View File

@@ -1,46 +0,0 @@
import { debounce } from "lodash";
import { useEffect, useRef, useState } from "react";
import { useBlockMenuContext } from "../block-menu-provider";
const SEARCH_DEBOUNCE_MS = 300;
export const useBlockMenuSearchBar = () => {
const inputRef = useRef<HTMLInputElement>(null);
const [localQuery, setLocalQuery] = useState("");
const { setSearchQuery, setSearchId, searchId } = useBlockMenuContext();
const searchIdRef = useRef(searchId);
useEffect(() => {
searchIdRef.current = searchId;
}, [searchId]);
const debouncedSetSearchQuery = debounce((value: string) => {
setSearchQuery(value);
if (value.length === 0) {
setSearchId(undefined);
} else if (!searchIdRef.current) {
setSearchId(crypto.randomUUID());
}
}, SEARCH_DEBOUNCE_MS);
useEffect(() => {
return () => {
debouncedSetSearchQuery.cancel();
};
}, [debouncedSetSearchQuery]);
const handleClear = () => {
setLocalQuery("");
setSearchQuery("");
setSearchId(undefined);
debouncedSetSearchQuery.cancel();
};
return {
handleClear,
inputRef,
localQuery,
setLocalQuery,
debouncedSetSearchQuery,
}
};

View File

@@ -1,117 +0,0 @@
import React from "react";
import { MenuItem } from "../MenuItem";
import { DefaultStateType } from "../block-menu-provider";
import { useBlockMenuSidebar } from "./useBlockMenuSidebar";
import { Skeleton } from "@/components/ui/skeleton";
import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard";
export const BlockMenuSidebar = () => {
const { blockCounts, setDefaultState, defaultState, isLoading, isError, error } = useBlockMenuSidebar();
if (isLoading) {
return (
<div className="w-fit space-y-2 px-4 pt-4">
<Skeleton className="h-12 w-[12.875rem]" />
<Skeleton className="h-12 w-[12.875rem]" />
<Skeleton className="h-12 w-[12.875rem]" />
<Skeleton className="h-12 w-[12.875rem]" />
<Skeleton className="h-12 w-[12.875rem]" />
<Skeleton className="h-12 w-[12.875rem]" />
</div>
);
}
if (isError) {
return <div className="w-fit space-y-2 px-4 pt-4">
<ErrorCard className="w-[12.875rem]" httpError={{status: 500, statusText: "Internal Server Error", message: error?.detail || 'An error occurred'}} />
</div>
}
const topLevelMenuItems = [
{
name: "Suggestion",
type: "suggestion",
},
{
name: "All blocks",
type: "all_blocks",
number: blockCounts?.all_blocks,
},
];
const subMenuItems = [
{
name: "Input blocks",
type: "input_blocks",
number: blockCounts?.input_blocks,
},
{
name: "Action blocks",
type: "action_blocks",
number: blockCounts?.action_blocks,
},
{
name: "Output blocks",
type: "output_blocks",
number: blockCounts?.output_blocks,
},
];
const bottomMenuItems = [
{
name: "Integrations",
type: "integrations",
number: blockCounts?.integrations,
onClick: () => {
setDefaultState("integrations");
},
},
{
name: "Marketplace Agents",
type: "marketplace_agents",
number: blockCounts?.marketplace_agents,
},
{
name: "My Agents",
type: "my_agents",
number: blockCounts?.my_agents,
},
];
return (
<div className="w-fit space-y-2 px-4 pt-4">
{topLevelMenuItems.map((item) => (
<MenuItem
key={item.type}
name={item.name}
number={item.number}
selected={defaultState === item.type}
onClick={() => setDefaultState(item.type as DefaultStateType)}
/>
))}
<div className="ml-[0.5365rem] space-y-2 border-l border-black/10 pl-[0.75rem]">
{subMenuItems.map((item) => (
<MenuItem
key={item.type}
name={item.name}
number={item.number}
className="max-w-[11.5339rem]"
selected={defaultState === item.type}
onClick={() => setDefaultState(item.type as DefaultStateType)}
/>
))}
</div>
{bottomMenuItems.map((item) => (
<MenuItem
key={item.type}
name={item.name}
number={item.number}
selected={defaultState === item.type}
onClick={
item.onClick ||
(() => setDefaultState(item.type as DefaultStateType))
}
/>
))}
</div>
);
};

View File

@@ -1,24 +0,0 @@
import { useGetV2GetBuilderItemCounts } from "@/app/api/__generated__/endpoints/default/default";
import { useBlockMenuContext } from "../block-menu-provider";
import { CountResponse } from "@/app/api/__generated__/models/countResponse";
export const useBlockMenuSidebar = () => {
const { defaultState, setDefaultState } = useBlockMenuContext();
const { data: blockCounts, isLoading, isError, error} = useGetV2GetBuilderItemCounts({
query : {
select : (x) =>{
return x.data as CountResponse
}
}
});
return {
blockCounts,
setDefaultState,
defaultState,
isLoading,
isError,
error,
}
};

View File

@@ -1,74 +0,0 @@
import React from "react";
import {
Popover,
PopoverContent,
PopoverTrigger,
} from "@/components/ui/popover";
import { MagnifyingGlassIcon } from "@phosphor-icons/react";
import { GraphSearchContent } from "../GraphMenuContent/GraphContent";
import { ControlPanelButton } from "../ControlPanelButton";
import { CustomNode } from "@/components/CustomNode";
import { useGraphMenu } from "./useGraphMenu";
interface GraphSearchMenuProps {
nodes: CustomNode[];
blockMenuSelected: "save" | "block" | "search" | "";
setBlockMenuSelected: React.Dispatch<
React.SetStateAction<"" | "save" | "block" | "search">
>;
onNodeSelect: (nodeId: string) => void;
onNodeHover?: (nodeId: string | null) => void;
}
export const GraphSearchMenu: React.FC<GraphSearchMenuProps> = ({
nodes,
blockMenuSelected,
setBlockMenuSelected,
onNodeSelect,
onNodeHover,
}) => {
const {
open,
searchQuery,
setSearchQuery,
filteredNodes,
handleNodeSelect,
handleOpenChange,
} = useGraphMenu({
nodes,
blockMenuSelected,
setBlockMenuSelected,
onNodeSelect,
});
return (
<Popover open={open} onOpenChange={handleOpenChange}>
<PopoverTrigger className="hover:cursor-pointer">
<ControlPanelButton
data-id="graph-search-control-popover-trigger"
data-testid="graph-search-control-button"
selected={blockMenuSelected === "search"}
className="rounded-none"
>
<MagnifyingGlassIcon className="h-5 w-6" strokeWidth={2} />
</ControlPanelButton>
</PopoverTrigger>
<PopoverContent
side="right"
align="start"
sideOffset={16}
className="absolute h-[75vh] w-[46.625rem] overflow-hidden rounded-[1rem] border-none p-0 shadow-[0_2px_6px_0_rgba(0,0,0,0.05)]"
data-id="graph-search-popover-content"
>
<GraphSearchContent
searchQuery={searchQuery}
onSearchChange={setSearchQuery}
filteredNodes={filteredNodes}
onNodeSelect={handleNodeSelect}
onNodeHover={onNodeHover}
/>
</PopoverContent>
</Popover>
);
};

View File

@@ -1,41 +0,0 @@
import { useGraphSearch } from "../GraphMenuSearchBar/useGraphMenuSearchBar";
import { CustomNode } from "@/components/CustomNode";
interface UseGraphMenuProps {
nodes: CustomNode[];
blockMenuSelected: "save" | "block" | "search" | "";
setBlockMenuSelected: React.Dispatch<
React.SetStateAction<"" | "save" | "block" | "search">
>;
onNodeSelect: (nodeId: string) => void;
}
export const useGraphMenu = ({
nodes,
setBlockMenuSelected,
onNodeSelect,
}: UseGraphMenuProps) => {
const { open, setOpen, searchQuery, setSearchQuery, filteredNodes } =
useGraphSearch(nodes);
const handleNodeSelect = (nodeId: string) => {
onNodeSelect(nodeId);
setOpen(false);
setSearchQuery("");
setBlockMenuSelected("");
};
const handleOpenChange = (newOpen: boolean) => {
setOpen(newOpen);
setBlockMenuSelected(newOpen ? "search" : "");
};
return {
open,
searchQuery,
setSearchQuery,
filteredNodes,
handleNodeSelect,
handleOpenChange,
};
};

View File

@@ -1,131 +0,0 @@
import React from "react";
import { Separator } from "@/components/ui/separator";
import { ScrollArea } from "@/components/ui/scroll-area";
import { beautifyString, getPrimaryCategoryColor } from "@/lib/utils";
import { SearchableNode } from "../GraphMenuSearchBar/useGraphMenuSearchBar";
import { TextRenderer } from "@/components/ui/render";
import {
Tooltip,
TooltipContent,
TooltipTrigger,
TooltipProvider,
} from "@/components/ui/tooltip";
import { GraphMenuSearchBar } from "../GraphMenuSearchBar/GraphMenuSearchBar";
import { useGraphContent } from "./useGraphContent";
interface GraphSearchContentProps {
searchQuery: string;
onSearchChange: (query: string) => void;
filteredNodes: SearchableNode[];
onNodeSelect: (nodeId: string) => void;
onNodeHover?: (nodeId: string | null) => void;
}
export const GraphSearchContent: React.FC<GraphSearchContentProps> = ({
searchQuery,
onSearchChange,
filteredNodes,
onNodeSelect,
onNodeHover,
}) => {
const {
selectedIndex,
setSelectedIndex,
handleKeyDown,
getNodeInputOutputSummary,
} = useGraphContent({
searchQuery,
filteredNodes,
onNodeSelect,
});
return (
<div className="flex h-full w-full flex-col">
{/* Search Bar */}
<GraphMenuSearchBar
searchQuery={searchQuery}
onSearchChange={onSearchChange}
onKeyDown={handleKeyDown}
/>
<Separator className="h-[1px] w-full text-zinc-300" />
{/* Search Results */}
<div className="flex-1 overflow-hidden">
{searchQuery && (
<div className="px-4 py-2 text-xs text-gray-500">
Found {filteredNodes.length} node{filteredNodes.length !== 1 ? "s" : ""}
</div>
)}
<ScrollArea className="h-full w-full">
{filteredNodes.length === 0 ? (
<div className="flex h-32 items-center justify-center text-sm text-gray-500 dark:text-gray-400">
{searchQuery ? "No nodes found matching your search" : "Start typing to search nodes"}
</div>
) : (
filteredNodes.map((node, index) => {
// Safety check for node data
if (!node || !node.data) {
return null;
}
const nodeTitle = node.data?.metadata?.customized_name ||
beautifyString(node.data?.blockType || "").replace(/ Block$/, "");
const nodeType = beautifyString(node.data?.blockType || "").replace(/ Block$/, "");
return (
<TooltipProvider key={node.id}>
<Tooltip delayDuration={300}>
<TooltipTrigger asChild>
<div
className={`mx-4 my-2 flex h-20 cursor-pointer rounded-lg border border-zinc-200 bg-white ${
index === selectedIndex
? "border-zinc-400 shadow-md"
: "hover:border-zinc-300 hover:shadow-sm"
}`}
onClick={() => onNodeSelect(node.id)}
onMouseEnter={() => {
setSelectedIndex(index);
onNodeHover?.(node.id);
}}
onMouseLeave={() => onNodeHover?.(null)}
>
<div
className={`h-full w-3 rounded-l-[7px] ${getPrimaryCategoryColor(node.data?.categories)}`}
/>
<div className="mx-3 flex flex-1 items-center justify-between">
<div className="mr-2 min-w-0">
<span className="block truncate pb-1 text-sm font-semibold text-zinc-800">
<TextRenderer
value={nodeTitle}
truncateLengthLimit={45}
/>
</span>
<span className="block break-all text-xs font-normal text-zinc-500">
<TextRenderer
value={getNodeInputOutputSummary(node) || node.data?.description || ""}
truncateLengthLimit={165}
/>
</span>
</div>
</div>
</div>
</TooltipTrigger>
<TooltipContent side="right" className="max-w-xs">
<div className="space-y-1">
<div className="font-semibold">Node Type: {nodeType}</div>
{node.data?.metadata?.customized_name && (
<div className="text-xs text-gray-500">Custom Name: {node.data.metadata.customized_name}</div>
)}
</div>
</TooltipContent>
</Tooltip>
</TooltipProvider>
);
})
)}
</ScrollArea>
</div>
</div>
);
};

View File

@@ -1,60 +0,0 @@
import { useEffect, useState } from "react";
import { SearchableNode } from "../GraphMenuSearchBar/useGraphMenuSearchBar";
interface UseGraphContentProps {
searchQuery: string;
filteredNodes: SearchableNode[];
onNodeSelect: (nodeId: string) => void;
}
export const useGraphContent = ({
searchQuery,
filteredNodes,
onNodeSelect,
}: UseGraphContentProps) => {
const [selectedIndex, setSelectedIndex] = useState(0);
useEffect(() => {
setSelectedIndex(0);
}, [searchQuery]);
const handleKeyDown = (e: React.KeyboardEvent) => {
if (e.key === "ArrowDown") {
e.preventDefault();
setSelectedIndex((prev) => Math.min(prev + 1, filteredNodes.length - 1));
} else if (e.key === "ArrowUp") {
e.preventDefault();
setSelectedIndex((prev) => Math.max(prev - 1, 0));
} else if (e.key === "Enter" && filteredNodes.length > 0) {
e.preventDefault();
onNodeSelect(filteredNodes[selectedIndex].id);
}
};
const getNodeInputOutputSummary = (node: SearchableNode) => {
// Safety check for node data
if (!node || !node.data) {
return "";
}
const inputs = Object.keys(node.data?.inputSchema?.properties || {});
const outputs = Object.keys(node.data?.outputSchema?.properties || {});
const parts = [];
if (inputs.length > 0) {
parts.push(`Inputs: ${inputs.slice(0, 3).join(", ")}${inputs.length > 3 ? "..." : ""}`);
}
if (outputs.length > 0) {
parts.push(`Outputs: ${outputs.slice(0, 3).join(", ")}${outputs.length > 3 ? "..." : ""}`);
}
return parts.join(" | ");
};
return {
selectedIndex,
setSelectedIndex,
handleKeyDown,
getNodeInputOutputSummary,
};
};

View File

@@ -1,60 +0,0 @@
import { cn } from "@/lib/utils";
import React from "react";
import { Input } from "@/components/ui/input";
import { Button } from "@/components/ui/button";
import { MagnifyingGlassIcon, XIcon } from "@phosphor-icons/react";
import { useGraphMenuSearchBarComponent } from "./useGraphMenuSearchBarComponent";
interface GraphMenuSearchBarProps {
className?: string;
searchQuery: string;
onSearchChange: (query: string) => void;
onKeyDown?: (e: React.KeyboardEvent) => void;
}
export const GraphMenuSearchBar: React.FC<GraphMenuSearchBarProps> = ({
className = "",
searchQuery,
onSearchChange,
onKeyDown,
}) => {
const { inputRef, handleClear } = useGraphMenuSearchBarComponent({
onSearchChange,
});
return (
<div
className={cn(
"flex min-h-[3.5625rem] items-center gap-2.5 px-4",
className,
)}
>
<div className="flex h-6 w-6 items-center justify-center">
<MagnifyingGlassIcon className="h-6 w-6 text-zinc-700" strokeWidth={2} />
</div>
<Input
ref={inputRef}
type="text"
value={searchQuery}
onChange={(e) => onSearchChange(e.target.value)}
onKeyDown={onKeyDown}
placeholder={"Search your graph for nodes, inputs, outputs..."}
className={cn(
"m-0 border-none p-0 font-sans text-base font-normal text-zinc-800 shadow-none outline-none",
"placeholder:text-zinc-400 focus:shadow-none focus:outline-none focus:ring-0",
)}
autoFocus
/>
{searchQuery.length > 0 && (
<Button
variant="ghost"
size={"sm"}
onClick={handleClear}
className="p-0 hover:bg-transparent"
>
<XIcon className="h-6 w-6 text-zinc-700" strokeWidth={2} />
</Button>
)}
</div>
);
};

View File

@@ -1,146 +0,0 @@
import { useState, useMemo, useDeferredValue } from "react";
import { CustomNode } from "@/components/CustomNode";
import { beautifyString } from "@/lib/utils";
import jaro from "jaro-winkler";
export type SearchableNode = CustomNode & {
searchScore?: number;
matchedFields?: string[];
};
export const useGraphSearch = (nodes: CustomNode[]) => {
const [open, setOpen] = useState(false);
const [searchQuery, setSearchQuery] = useState("");
const deferredSearchQuery = useDeferredValue(searchQuery);
const filteredNodes = useMemo(() => {
// Filter out invalid nodes
const validNodes = (nodes || []).filter(node => node && node.data);
if (!deferredSearchQuery.trim()) {
return validNodes.map(node => ({ ...node, searchScore: 1, matchedFields: [] }));
}
const query = deferredSearchQuery.toLowerCase().trim();
const queryWords = query.split(/\s+/);
return validNodes
.map((node): SearchableNode => {
const { score, matchedFields } = calculateNodeScore(node, query, queryWords);
return { ...node, searchScore: score, matchedFields };
})
.filter(node => node.searchScore! > 0)
.sort((a, b) => b.searchScore! - a.searchScore!);
}, [nodes, deferredSearchQuery]);
return {
open,
setOpen,
searchQuery,
setSearchQuery,
filteredNodes,
};
};
function calculateNodeScore(
node: CustomNode,
query: string,
queryWords: string[]
): { score: number; matchedFields: string[] } {
const matchedFields: string[] = [];
let score = 0;
// Safety check for node data
if (!node || !node.data) {
return { score: 0, matchedFields: [] };
}
// Prepare searchable text with defensive checks
const nodeTitle = (node.data?.title || "").toLowerCase(); // This includes the ID
const nodeId = (node.id || "").toLowerCase();
const nodeDescription = (node.data?.description || "").toLowerCase();
const blockType = (node.data?.blockType || "").toLowerCase();
const beautifiedBlockType = beautifyString(blockType).toLowerCase();
const customizedName = (node.data?.metadata?.customized_name || "").toLowerCase();
// Get input and output names with defensive checks
const inputNames = Object.keys(node.data?.inputSchema?.properties || {})
.map(key => key.toLowerCase());
const outputNames = Object.keys(node.data?.outputSchema?.properties || {})
.map(key => key.toLowerCase());
// 1. Check exact match in customized name, title (includes ID), node ID, or block type (highest priority)
if (customizedName.includes(query) || nodeTitle.includes(query) || nodeId.includes(query) || blockType.includes(query) || beautifiedBlockType.includes(query)) {
score = 4;
matchedFields.push("title");
}
// 2. Check all query words in customized name, title or block type
else if (queryWords.every(word => customizedName.includes(word) || nodeTitle.includes(word) || beautifiedBlockType.includes(word))) {
score = 3.5;
matchedFields.push("title");
}
// 3. Check exact match in input/output names
else if (inputNames.some(name => name.includes(query))) {
score = 3;
matchedFields.push("inputs");
}
else if (outputNames.some(name => name.includes(query))) {
score = 2.8;
matchedFields.push("outputs");
}
// 4. Check all query words in input/output names
else if (inputNames.some(name => queryWords.every(word => name.includes(word)))) {
score = 2.5;
matchedFields.push("inputs");
}
else if (outputNames.some(name => queryWords.every(word => name.includes(word)))) {
score = 2.3;
matchedFields.push("outputs");
}
// 5. Similarity matching using Jaro-Winkler
else {
const titleSimilarity = Math.max(
jaro(customizedName, query),
jaro(nodeTitle, query),
jaro(nodeId, query),
jaro(beautifiedBlockType, query)
);
if (titleSimilarity > 0.7) {
score = 1.5 + titleSimilarity;
matchedFields.push("title");
}
// Check similarity with input/output names
const inputSimilarity = Math.max(...inputNames.map(name => jaro(name, query)), 0);
const outputSimilarity = Math.max(...outputNames.map(name => jaro(name, query)), 0);
if (inputSimilarity > 0.7 && inputSimilarity > score) {
score = 1 + inputSimilarity;
matchedFields.push("inputs");
}
if (outputSimilarity > 0.7 && outputSimilarity > score) {
score = 0.8 + outputSimilarity;
matchedFields.push("outputs");
}
}
// 6. Check description (lower priority)
if (score === 0 && nodeDescription.includes(query)) {
score = 0.5;
matchedFields.push("description");
}
// 7. Check if all query words appear in description
if (score === 0 && queryWords.every(word => nodeDescription.includes(word))) {
score = 0.3;
matchedFields.push("description");
}
return { score, matchedFields };
}

View File

@@ -1,21 +0,0 @@
import { useRef } from "react";
interface UseGraphMenuSearchBarComponentProps {
onSearchChange: (query: string) => void;
}
export const useGraphMenuSearchBarComponent = ({
onSearchChange,
}: UseGraphMenuSearchBarComponentProps) => {
const inputRef = useRef<HTMLInputElement>(null);
const handleClear = () => {
onSearchChange("");
inputRef.current?.focus();
};
return {
inputRef,
handleClear,
};
};

View File

@@ -8,9 +8,6 @@ import { GraphExecutionID } from "@/lib/autogpt-server-api";
import { history } from "@/components/history";
import { ControlPanelButton } from "../ControlPanelButton";
import { ArrowUUpLeftIcon, ArrowUUpRightIcon } from "@phosphor-icons/react";
import { GraphSearchMenu } from "../GraphMenu/GraphMenu";
import { CustomNode } from "@/components/CustomNode";
import { Flag, useGetFlag } from "@/services/feature-flags/use-get-flag";
export type Control = {
icon: React.ReactNode;
@@ -25,9 +22,6 @@ interface ControlPanelProps {
visualizeBeads: "no" | "static" | "animate";
pinSavePopover: boolean;
pinBlocksPopover: boolean;
nodes: CustomNode[];
onNodeSelect: (nodeId: string) => void;
onNodeHover?: (nodeId: string | null) => void;
}
export const NewControlPanel = ({
@@ -35,13 +29,8 @@ export const NewControlPanel = ({
visualizeBeads,
pinSavePopover,
pinBlocksPopover,
nodes,
onNodeSelect,
onNodeHover,
className,
}: ControlPanelProps) => {
const isGraphSearchEnabled = useGetFlag(Flag.GRAPH_SEARCH);
const {
blockMenuSelected,
setBlockMenuSelected,
@@ -88,18 +77,6 @@ export const NewControlPanel = ({
setBlockMenuSelected={setBlockMenuSelected}
/>
<Separator className="text-[#E1E1E1]" />
{isGraphSearchEnabled && (
<>
<GraphSearchMenu
nodes={nodes}
blockMenuSelected={blockMenuSelected}
setBlockMenuSelected={setBlockMenuSelected}
onNodeSelect={onNodeSelect}
onNodeHover={onNodeHover}
/>
<Separator className="text-[#E1E1E1]" />
</>
)}
{controls.map((control, index) => (
<ControlPanelButton
key={index}

View File

@@ -10,7 +10,7 @@ export interface NewControlPanelProps {
export const useNewControlPanel = ({flowExecutionID, visualizeBeads}: NewControlPanelProps) => {
const [blockMenuSelected, setBlockMenuSelected] = useState<
"save" | "block" | "search" | ""
"save" | "block" | ""
>("");
const query = useSearchParams();
const _graphVersion = query.get("flowVersion");

View File

@@ -23,9 +23,9 @@ interface SaveControlProps {
onDescriptionChange: (description: string) => void;
pinSavePopover: boolean;
blockMenuSelected: "save" | "block" | "search" | "";
blockMenuSelected: "save" | "block" | "";
setBlockMenuSelected: React.Dispatch<
React.SetStateAction<"" | "save" | "block" | "search">
React.SetStateAction<"" | "save" | "block">
>;
}

View File

@@ -1,64 +0,0 @@
"use client";
import { createContext, ReactNode, useContext, useState } from "react";
export type DefaultStateType =
| "suggestion"
| "all_blocks"
| "input_blocks"
| "action_blocks"
| "output_blocks"
| "integrations"
| "marketplace_agents"
| "my_agents";
interface BlockMenuContextType {
searchQuery: string;
setSearchQuery: React.Dispatch<React.SetStateAction<string>>;
searchId: string | undefined;
setSearchId: React.Dispatch<React.SetStateAction<string | undefined>>;
defaultState: DefaultStateType;
setDefaultState: React.Dispatch<React.SetStateAction<DefaultStateType>>;
}
export const BlockMenuContext = createContext<BlockMenuContextType>(
{} as BlockMenuContextType,
);
interface BlockMenuStateProviderProps {
children: ReactNode;
}
export function BlockMenuStateProvider({
children,
}: BlockMenuStateProviderProps) {
const [searchQuery, setSearchQuery] = useState("");
const [searchId, setSearchId] = useState<string | undefined>(undefined);
const [defaultState, setDefaultState] = useState<DefaultStateType>("suggestion");
return (
<BlockMenuContext.Provider
value={{
searchQuery,
setSearchQuery,
searchId,
setSearchId,
defaultState,
setDefaultState,
}}
>
{children}
</BlockMenuContext.Provider>
);
}
export function useBlockMenuContext(): BlockMenuContextType {
const context = useContext(BlockMenuContext);
if (!context) {
throw new Error(
"useBlockMenuContext must be used within a BlockMenuStateProvider",
);
}
return context;
}

View File

@@ -1,125 +0,0 @@
import { Input } from "@/components/atoms/Input/Input";
import { Button } from "@/components/atoms/Button/Button";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { Form, FormDescription, FormField } from "@/components/ui/form";
import {
BlockIOCredentialsSubSchema,
CredentialsMetaInput,
} from "@/lib/autogpt-server-api/types";
import { useAPIKeyCredentialsModal } from "./useAPIKeyCredentialsModal";
type Props = {
schema: BlockIOCredentialsSubSchema;
open: boolean;
onClose: () => void;
onCredentialsCreate: (creds: CredentialsMetaInput) => void;
siblingInputs?: Record<string, any>;
};
export function APIKeyCredentialsModal({
schema,
open,
onClose,
onCredentialsCreate,
siblingInputs,
}: Props) {
const {
form,
isLoading,
supportsApiKey,
providerName,
schemaDescription,
onSubmit,
} = useAPIKeyCredentialsModal({ schema, siblingInputs, onCredentialsCreate });
if (isLoading || !supportsApiKey) {
return null;
}
return (
<Dialog
title={`Add new API key for ${providerName ?? ""}`}
controlled={{
isOpen: open,
set: (isOpen) => {
if (!isOpen) onClose();
},
}}
onClose={onClose}
styling={{
maxWidth: "25rem",
}}
>
<Dialog.Content>
{schemaDescription && (
<p className="mb-4 text-sm text-zinc-600">{schemaDescription}</p>
)}
<Form {...form}>
<form onSubmit={form.handleSubmit(onSubmit)} className="space-y-2">
<FormField
control={form.control}
name="apiKey"
render={({ field }) => (
<>
<Input
id="apiKey"
label="API Key"
type="password"
placeholder="Enter API key..."
size="small"
hint={
schema.credentials_scopes ? (
<FormDescription>
Required scope(s) for this block:{" "}
{schema.credentials_scopes?.map((s, i, a) => (
<span key={i}>
<code className="text-xs font-bold">{s}</code>
{i < a.length - 1 && ", "}
</span>
))}
</FormDescription>
) : null
}
{...field}
/>
</>
)}
/>
<FormField
control={form.control}
name="title"
render={({ field }) => (
<Input
id="title"
label="Name"
type="text"
placeholder="Enter a name for this API key..."
size="small"
{...field}
/>
)}
/>
<FormField
control={form.control}
name="expiresAt"
render={({ field }) => (
<Input
id="expiresAt"
label="Expiration Date"
type="datetime-local"
placeholder="Select expiration date..."
size="small"
{...field}
/>
)}
/>
<Button type="submit" size="small" className="min-w-68">
Save & use this API key
</Button>
</form>
</Form>
</Dialog.Content>
</Dialog>
);
}

View File

@@ -1,82 +0,0 @@
import { z } from "zod";
import { useForm, type UseFormReturn } from "react-hook-form";
import { zodResolver } from "@hookform/resolvers/zod";
import useCredentials from "@/hooks/useCredentials";
import {
BlockIOCredentialsSubSchema,
CredentialsMetaInput,
} from "@/lib/autogpt-server-api/types";
export type APIKeyFormValues = {
apiKey: string;
title: string;
expiresAt?: string;
};
type Args = {
schema: BlockIOCredentialsSubSchema;
siblingInputs?: Record<string, any>;
onCredentialsCreate: (creds: CredentialsMetaInput) => void;
};
export function useAPIKeyCredentialsModal({
schema,
siblingInputs,
onCredentialsCreate,
}: Args): {
form: UseFormReturn<APIKeyFormValues>;
isLoading: boolean;
supportsApiKey: boolean;
provider?: string;
providerName?: string;
schemaDescription?: string;
onSubmit: (values: APIKeyFormValues) => Promise<void>;
} {
const credentials = useCredentials(schema, siblingInputs);
const formSchema = z.object({
apiKey: z.string().min(1, "API Key is required"),
title: z.string().min(1, "Name is required"),
expiresAt: z.string().optional(),
});
const form = useForm<APIKeyFormValues>({
resolver: zodResolver(formSchema),
defaultValues: {
apiKey: "",
title: "",
expiresAt: "",
},
});
async function onSubmit(values: APIKeyFormValues) {
if (!credentials || credentials.isLoading) return;
const expiresAt = values.expiresAt
? new Date(values.expiresAt).getTime() / 1000
: undefined;
const newCredentials = await credentials.createAPIKeyCredentials({
api_key: values.apiKey,
title: values.title,
expires_at: expiresAt,
});
onCredentialsCreate({
provider: credentials.provider,
id: newCredentials.id,
type: "api_key",
title: newCredentials.title,
});
}
return {
form,
isLoading: !credentials || credentials.isLoading,
supportsApiKey: !!credentials?.supportsApiKey,
provider: credentials?.provider,
providerName:
!credentials || credentials.isLoading
? undefined
: credentials.providerName,
schemaDescription: schema.description,
onSubmit,
};
}

View File

@@ -1,30 +0,0 @@
import { Dialog } from "@/components/molecules/Dialog/Dialog";
type Props = {
open: boolean;
onClose: () => void;
providerName: string;
};
export function OAuthFlowWaitingModal({ open, onClose, providerName }: Props) {
return (
<Dialog
title={`Waiting on ${providerName} sign-in process...`}
controlled={{
isOpen: open,
set: (isOpen) => {
if (!isOpen) onClose();
},
}}
onClose={onClose}
>
<Dialog.Content>
<p className="text-sm text-zinc-600">
Complete the sign-in process in the pop-up window.
<br />
Closing this dialog will cancel the sign-in process.
</p>
</Dialog.Content>
</Dialog>
);
}

View File

@@ -1,232 +0,0 @@
import React from "react";
import { format } from "date-fns";
import { Input as DSInput } from "@/components/atoms/Input/Input";
import { Select as DSSelect } from "@/components/atoms/Select/Select";
import { MultiToggle } from "@/components/molecules/MultiToggle/MultiToggle";
// Removed shadcn Select usage in favor of DS Select for time picker
import {
BlockIOObjectSubSchema,
BlockIOSubSchema,
DataType,
determineDataType,
} from "@/lib/autogpt-server-api/types";
import { TimePicker } from "@/components/molecules/TimePicker/TimePicker";
import { FileInput } from "@/components/atoms/FileInput/FileInput";
import { useRunAgentInputs } from "./useRunAgentInputs";
import { Switch } from "@/components/atoms/Switch/Switch";
/**
* A generic prop structure for the TypeBasedInput.
*
* onChange expects an event-like object with e.target.value so the parent
* can do something like setInputValues(e.target.value).
*/
interface Props {
schema: BlockIOSubSchema;
value?: any;
placeholder?: string;
onChange: (value: any) => void;
}
/**
* A generic, data-type-based input component that uses Shadcn UI.
* It inspects the schema via `determineDataType` and renders
* the correct UI component.
*/
export function RunAgentInputs({
schema,
value,
placeholder,
onChange,
...props
}: Props & React.HTMLAttributes<HTMLElement>) {
const { handleUploadFile, uploadProgress } = useRunAgentInputs();
const dataType = determineDataType(schema);
const baseId = String(schema.title ?? "input")
.replace(/\s+/g, "-")
.toLowerCase();
let innerInputElement: React.ReactNode = null;
switch (dataType) {
case DataType.NUMBER:
innerInputElement = (
<DSInput
id={`${baseId}-number`}
label={schema.title ?? placeholder ?? "Number"}
hideLabel
size="small"
type="number"
value={value ?? ""}
placeholder={placeholder || "Enter number"}
onChange={(e) =>
onChange(Number((e.target as HTMLInputElement).value))
}
{...props}
/>
);
break;
case DataType.LONG_TEXT:
innerInputElement = (
<DSInput
id={`${baseId}-textarea`}
label={schema.title ?? placeholder ?? "Text"}
hideLabel
size="small"
type="textarea"
rows={3}
value={value ?? ""}
placeholder={placeholder || "Enter text"}
onChange={(e) => onChange((e.target as HTMLInputElement).value)}
{...props}
/>
);
break;
case DataType.BOOLEAN:
innerInputElement = (
<>
<span className="text-sm text-gray-500">
{placeholder || (value ? "Enabled" : "Disabled")}
</span>
<Switch
className="ml-auto"
checked={!!value}
onCheckedChange={(checked: boolean) => onChange(checked)}
{...props}
/>
</>
);
break;
case DataType.DATE:
innerInputElement = (
<DSInput
id={`${baseId}-date`}
label={schema.title ?? placeholder ?? "Date"}
hideLabel
size="small"
type="date"
value={value ? format(value as Date, "yyyy-MM-dd") : ""}
onChange={(e) => {
const v = (e.target as HTMLInputElement).value;
if (!v) onChange(undefined);
else {
const [y, m, d] = v.split("-").map(Number);
onChange(new Date(y, m - 1, d));
}
}}
placeholder={placeholder || "Pick a date"}
{...props}
/>
);
break;
case DataType.TIME:
innerInputElement = (
<TimePicker value={value?.toString()} onChange={onChange} />
);
break;
case DataType.DATE_TIME:
innerInputElement = (
<DSInput
id={`${baseId}-datetime`}
label={schema.title ?? placeholder ?? "Date time"}
hideLabel
size="small"
type="datetime-local"
value={value ?? ""}
onChange={(e) => onChange((e.target as HTMLInputElement).value)}
placeholder={placeholder || "Enter date and time"}
{...props}
/>
);
break;
case DataType.FILE:
innerInputElement = (
<FileInput
value={value}
placeholder={placeholder}
onChange={onChange}
onUploadFile={handleUploadFile}
uploadProgress={uploadProgress}
{...props}
/>
);
break;
case DataType.SELECT:
if (
"enum" in schema &&
Array.isArray(schema.enum) &&
schema.enum.length > 0
) {
innerInputElement = (
<DSSelect
id={`${baseId}-select`}
label={schema.title ?? placeholder ?? "Select"}
hideLabel
value={value ?? ""}
size="small"
onValueChange={(val: string) => onChange(val)}
placeholder={placeholder || "Select an option"}
options={schema.enum
.filter((opt) => opt)
.map((opt) => ({ value: opt, label: String(opt) }))}
/>
);
break;
}
case DataType.MULTI_SELECT: {
const _schema = schema as BlockIOObjectSubSchema;
const allKeys = Object.keys(_schema.properties);
const selectedValues = Object.entries(value || {})
.filter(([_, v]) => v)
.map(([k]) => k);
innerInputElement = (
<MultiToggle
items={allKeys.map((key) => ({
value: key,
label: _schema.properties[key]?.title ?? key,
size: "small",
}))}
selectedValues={selectedValues}
onChange={(values: string[]) =>
onChange(
Object.fromEntries(
allKeys.map((opt) => [opt, values.includes(opt)]),
),
)
}
className="nodrag"
aria-label={schema.title}
/>
);
break;
}
case DataType.SHORT_TEXT:
default:
innerInputElement = (
<DSInput
id={`${baseId}-text`}
label={schema.title ?? placeholder ?? "Text"}
hideLabel
size="small"
type="text"
value={value ?? ""}
onChange={(e) => onChange((e.target as HTMLInputElement).value)}
placeholder={placeholder || "Enter text"}
{...props}
/>
);
}
return <div className="no-drag relative flex">{innerInputElement}</div>;
}

View File

@@ -1,19 +0,0 @@
import BackendAPI from "@/lib/autogpt-server-api";
import { useState } from "react";
export function useRunAgentInputs() {
const api = new BackendAPI();
const [uploadProgress, setUploadProgress] = useState(0);
async function handleUploadFile(file: File) {
const result = await api.uploadFile(file, "gcs", 24, (progress) =>
setUploadProgress(progress),
);
return result;
}
return {
uploadProgress,
handleUploadFile,
};
}

View File

@@ -1,7 +1,6 @@
"use client";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { Button } from "@/components/atoms/Button/Button";
import { useState } from "react";
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
import { useAgentRunModal } from "./useAgentRunModal";
@@ -9,13 +8,10 @@ import { ModalHeader } from "./components/ModalHeader/ModalHeader";
import { AgentCostSection } from "./components/AgentCostSection/AgentCostSection";
import { AgentSectionHeader } from "./components/AgentSectionHeader/AgentSectionHeader";
import { DefaultRunView } from "./components/DefaultRunView/DefaultRunView";
import { RunAgentModalContextProvider } from "./context";
import { ScheduleView } from "./components/ScheduleView/ScheduleView";
import { AgentDetails } from "./components/AgentDetails/AgentDetails";
import { RunActions } from "./components/RunActions/RunActions";
import { ScheduleActions } from "./components/ScheduleActions/ScheduleActions";
import { Text } from "@/components/atoms/Text/Text";
import { AlarmIcon, TrashIcon } from "@phosphor-icons/react";
interface Props {
triggerSlot: React.ReactNode;
@@ -26,45 +22,20 @@ interface Props {
export function RunAgentModal({ triggerSlot, agent }: Props) {
const {
// UI state
isOpen,
setIsOpen,
showScheduleView,
// Run mode
defaultRunType,
// Form: regular inputs
inputValues,
setInputValues,
// Form: credentials
inputCredentials,
setInputCredentials,
// Preset/trigger labels
presetName,
presetDescription,
setPresetName,
setPresetDescription,
// Scheduling
scheduleName,
cronExpression,
// Validation/readiness
allRequiredInputsAreSet,
// Schemas
agentInputFields,
agentCredentialsInputFields,
// Async states
// agentInputFields, // Available if needed for future use
hasInputFields,
isExecuting,
isCreatingSchedule,
isSettingUpTrigger,
// Actions
handleRun,
handleSchedule,
handleShowSchedule,
@@ -75,10 +46,6 @@ export function RunAgentModal({ triggerSlot, agent }: Props) {
const [isScheduleFormValid, setIsScheduleFormValid] = useState(true);
const hasAnySetupFields =
Object.keys(agentInputFields || {}).length > 0 ||
Object.keys(agentCredentialsInputFields || {}).length > 0;
function handleInputChange(key: string, value: string) {
setInputValues((prev) => ({
...prev,
@@ -86,136 +53,92 @@ export function RunAgentModal({ triggerSlot, agent }: Props) {
}));
}
function handleCredentialsChange(key: string, value: any | undefined) {
setInputCredentials((prev) => {
const next = { ...prev } as Record<string, any>;
if (value === undefined) {
delete next[key];
return next;
}
next[key] = value;
return next;
});
}
function handleSetOpen(open: boolean) {
setIsOpen(open);
// Always reset to Run view when opening/closing
if (open || !open) handleGoBack();
}
function handleRemoveSchedule() {
handleGoBack();
handleSetScheduleName("");
handleSetCronExpression("");
}
return (
<>
<Dialog
controlled={{ isOpen, set: handleSetOpen }}
styling={{ maxWidth: "600px", maxHeight: "90vh" }}
>
<Dialog.Trigger>{triggerSlot}</Dialog.Trigger>
<Dialog.Content>
<div className="flex h-full flex-col pb-4">
{/* Header */}
<div className="flex-shrink-0">
<ModalHeader agent={agent} />
<AgentCostSection flowId={agent.graph_id} />
</div>
<Dialog
controlled={{ isOpen, set: handleSetOpen }}
styling={{ maxWidth: "600px", maxHeight: "90vh" }}
>
<Dialog.Trigger>{triggerSlot}</Dialog.Trigger>
<Dialog.Content>
<div className="flex h-full flex-col">
{/* Header */}
<div className="flex-shrink-0">
<ModalHeader agent={agent} />
<AgentCostSection flowId={agent.graph_id} />
</div>
{/* Scrollable content */}
<div className="flex-1 pr-1" style={{ scrollbarGutter: "stable" }}>
{/* Setup Section */}
<div className="mt-10">
{hasAnySetupFields ? (
<RunAgentModalContextProvider
value={{
agent,
defaultRunType,
presetName,
setPresetName,
presetDescription,
setPresetDescription,
inputValues,
setInputValue: handleInputChange,
agentInputFields,
inputCredentials,
setInputCredentialsValue: handleCredentialsChange,
agentCredentialsInputFields,
}}
>
<>
<AgentSectionHeader
title={
defaultRunType === "automatic-trigger"
? "Trigger Setup"
: "Agent Setup"
}
/>
<div>
<DefaultRunView />
</div>
</>
</RunAgentModalContextProvider>
) : null}
</div>
{/* Schedule Section - always visible */}
<div className="mt-8">
<AgentSectionHeader title="Schedule Setup" />
{showScheduleView ? (
<>
<div className="mb-3 flex justify-start">
<Button
variant="secondary"
size="small"
onClick={handleRemoveSchedule}
>
<TrashIcon size={16} />
Remove schedule
</Button>
</div>
{/* Scrollable content */}
<div
className="flex-1 overflow-y-auto overflow-x-hidden pr-1"
style={{ scrollbarGutter: "stable" }}
>
{/* Setup Section */}
<div className="mt-10">
{showScheduleView ? (
<>
<AgentSectionHeader title="Schedule Setup" />
<div>
<ScheduleView
agent={agent}
scheduleName={scheduleName}
cronExpression={cronExpression}
inputValues={inputValues}
onScheduleNameChange={handleSetScheduleName}
onCronExpressionChange={handleSetCronExpression}
onInputChange={handleInputChange}
onValidityChange={setIsScheduleFormValid}
/>
</>
) : (
<div className="flex flex-col items-start gap-2">
<Text variant="body" className="mb-3 !text-zinc-500">
No schedule configured. Create a schedule to run this
agent automatically at a specific time.{" "}
</Text>
<Button
variant="secondary"
size="small"
onClick={handleShowSchedule}
>
<AlarmIcon size={16} />
Create schedule
</Button>
</div>
)}
</div>
</>
) : hasInputFields ? (
<>
<AgentSectionHeader
title={
defaultRunType === "automatic-trigger"
? "Trigger Setup"
: "Agent Setup"
}
/>
<div>
<DefaultRunView
agent={agent}
defaultRunType={defaultRunType}
inputValues={inputValues}
onInputChange={handleInputChange}
/>
</div>
</>
) : null}
</div>
{/* Agent Details Section */}
<div className="mt-8">
<AgentSectionHeader title="Agent Details" />
<AgentDetails agent={agent} />
</div>
{/* Agent Details Section */}
<div className="mt-8">
<AgentSectionHeader title="Agent Details" />
<AgentDetails agent={agent} />
</div>
</div>
<Dialog.Footer
className="fixed bottom-1 left-0 z-10 w-full bg-white p-4"
style={{ boxShadow: "0px -8px 10px white" }}
>
{showScheduleView ? (
{/* Fixed Actions - sticky inside dialog scroll */}
<Dialog.Footer className="sticky bottom-0 z-10 bg-white">
{!showScheduleView ? (
<RunActions
hasExternalTrigger={agent.has_external_trigger}
defaultRunType={defaultRunType}
onShowSchedule={handleShowSchedule}
onRun={handleRun}
isExecuting={isExecuting}
isSettingUpTrigger={isSettingUpTrigger}
allRequiredInputsAreSet={allRequiredInputsAreSet}
/>
) : (
<ScheduleActions
onGoBack={handleGoBack}
onSchedule={handleSchedule}
isCreatingSchedule={isCreatingSchedule}
allRequiredInputsAreSet={
@@ -224,18 +147,10 @@ export function RunAgentModal({ triggerSlot, agent }: Props) {
isScheduleFormValid
}
/>
) : (
<RunActions
defaultRunType={defaultRunType}
onRun={handleRun}
isExecuting={isExecuting}
isSettingUpTrigger={isSettingUpTrigger}
allRequiredInputsAreSet={allRequiredInputsAreSet}
/>
)}
</Dialog.Footer>
</Dialog.Content>
</Dialog>
</>
</div>
</Dialog.Content>
</Dialog>
);
}

View File

@@ -1,6 +1,7 @@
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
import { Text } from "@/components/atoms/Text/Text";
import { Badge } from "@/components/atoms/Badge/Badge";
import { formatAgentStatus, getStatusColor } from "./helpers";
import { formatDate } from "@/lib/utils/time";
interface Props {
@@ -10,6 +11,20 @@ interface Props {
export function AgentDetails({ agent }: Props) {
return (
<div className="mt-4 flex flex-col gap-5">
<div>
<Text variant="body-medium" className="mb-1 !text-black">
Current Status
</Text>
<div className="flex items-center gap-2">
<div
className={`h-2 w-2 rounded-full ${getStatusColor(agent.status)}`}
/>
<Text variant="body" className="!text-zinc-700">
{formatAgentStatus(agent.status)}
</Text>
</div>
</div>
<div>
<Text variant="body-medium" className="mb-1 !text-black">
Version

View File

@@ -0,0 +1,23 @@
import { LibraryAgentStatus } from "@/app/api/__generated__/models/libraryAgentStatus";
export function formatAgentStatus(status: LibraryAgentStatus) {
const statusMap: Record<string, string> = {
COMPLETED: "Ready",
HEALTHY: "Running",
WAITING: "Run Queued",
ERROR: "Failed Run",
};
return statusMap[status];
}
export function getStatusColor(status: LibraryAgentStatus): string {
const colorMap: Record<LibraryAgentStatus, string> = {
COMPLETED: "bg-blue-300",
HEALTHY: "bg-green-300",
WAITING: "bg-amber-300",
ERROR: "bg-red-300",
};
return colorMap[status] || "bg-gray-300";
}

View File

@@ -0,0 +1,51 @@
import { Input } from "@/components/atoms/Input/Input";
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
interface Props {
agent: LibraryAgent;
inputValues: Record<string, any>;
onInputChange: (key: string, value: string) => void;
variant?: "default" | "schedule";
}
export function AgentInputFields({
agent,
inputValues,
onInputChange,
variant = "default",
}: Props) {
const hasInputFields =
agent.input_schema &&
typeof agent.input_schema === "object" &&
"properties" in agent.input_schema;
if (!hasInputFields) {
const emptyStateClass =
variant === "schedule"
? "rounded-lg bg-neutral-50 p-4 text-sm text-neutral-500"
: "p-4 text-sm text-neutral-500";
return (
<div className={emptyStateClass}>
No input fields required for this agent
</div>
);
}
return (
<>
{Object.entries((agent.input_schema as any).properties || {}).map(
([key, schema]: [string, any]) => (
<Input
key={key}
id={key}
label={schema.title || key}
value={inputValues[key] || ""}
onChange={(e) => onInputChange(key, e.target.value)}
placeholder={schema.description}
/>
),
)}
</>
);
}

View File

@@ -1,102 +1,30 @@
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
import { RunVariant } from "../../useAgentRunModal";
import { WebhookTriggerBanner } from "../WebhookTriggerBanner/WebhookTriggerBanner";
import { Input } from "@/components/atoms/Input/Input";
import SchemaTooltip from "@/components/SchemaTooltip";
import { CredentialsInput } from "@/app/(platform)/library/agents/[id]/components/AgentRunsView/components/CredentialsInputs/CredentialsInputs";
import { useRunAgentModalContext } from "../../context";
import { RunAgentInputs } from "../../../RunAgentInputs/RunAgentInputs";
import { AgentInputFields } from "../AgentInputFields/AgentInputFields";
export function DefaultRunView() {
const {
agent,
defaultRunType,
presetName,
setPresetName,
presetDescription,
setPresetDescription,
inputValues,
setInputValue,
agentInputFields,
inputCredentials,
setInputCredentialsValue,
agentCredentialsInputFields,
} = useRunAgentModalContext();
interface Props {
agent: LibraryAgent;
defaultRunType: RunVariant;
inputValues: Record<string, any>;
onInputChange: (key: string, value: string) => void;
}
export function DefaultRunView({
agent,
defaultRunType,
inputValues,
onInputChange,
}: Props) {
return (
<div className="mb-12 mt-6">
<div className="mt-6">
{defaultRunType === "automatic-trigger" && <WebhookTriggerBanner />}
{/* Preset/Trigger fields */}
{defaultRunType === "automatic-trigger" && (
<div className="flex flex-col gap-4">
<div className="flex flex-col space-y-2">
<label className="flex items-center gap-1 text-sm font-medium">
Trigger Name
<SchemaTooltip description="Name of the trigger you are setting up" />
</label>
<Input
id="trigger_name"
label="Trigger Name"
size="small"
hideLabel
value={presetName}
placeholder="Enter trigger name"
onChange={(e) => setPresetName(e.target.value)}
/>
</div>
<div className="flex flex-col space-y-2">
<label className="flex items-center gap-1 text-sm font-medium">
Trigger Description
<SchemaTooltip description="Description of the trigger you are setting up" />
</label>
<Input
id="trigger_description"
label="Trigger Description"
size="small"
hideLabel
value={presetDescription}
placeholder="Enter trigger description"
onChange={(e) => setPresetDescription(e.target.value)}
/>
</div>
</div>
)}
{/* Credentials inputs */}
{Object.entries(agentCredentialsInputFields || {}).map(
([key, inputSubSchema]) => (
<CredentialsInput
key={key}
schema={{ ...inputSubSchema, discriminator: undefined } as any}
selectedCredentials={
(inputCredentials && inputCredentials[key]) ??
inputSubSchema.default
}
onSelectCredentials={(value) =>
setInputCredentialsValue(key, value)
}
siblingInputs={inputValues}
hideIfSingleCredentialAvailable={!agent.has_external_trigger}
/>
),
)}
{/* Regular inputs */}
{Object.entries(agentInputFields || {}).map(([key, inputSubSchema]) => (
<div key={key} className="flex flex-col gap-0 space-y-2">
<label className="flex items-center gap-1 text-sm font-medium">
{inputSubSchema.title || key}
<SchemaTooltip description={inputSubSchema.description} />
</label>
<RunAgentInputs
schema={inputSubSchema}
value={inputValues[key] ?? inputSubSchema.default}
placeholder={inputSubSchema.description}
onChange={(value) => setInputValue(key, value)}
data-testid={`agent-input-${key}`}
/>
</div>
))}
<AgentInputFields
agent={agent}
inputValues={inputValues}
onInputChange={onInputChange}
/>
</div>
);
}

View File

@@ -8,7 +8,6 @@ interface ModalHeaderProps {
}
export function ModalHeader({ agent }: ModalHeaderProps) {
const isUnknownCreator = agent.creator_name === "Unknown";
return (
<div className="space-y-4">
<div className="flex items-center gap-3">
@@ -16,9 +15,9 @@ export function ModalHeader({ agent }: ModalHeaderProps) {
</div>
<div>
<Text variant="h3">{agent.name}</Text>
{!isUnknownCreator ? (
<Text variant="body-medium">by {agent.creator_name}</Text>
) : null}
<Text variant="body-medium">
by {agent.creator_name === "Unknown" ? "" : agent.creator_name}
</Text>
<ShowMoreText
previewLimit={80}
variant="small"

View File

@@ -2,7 +2,9 @@ import { Button } from "@/components/atoms/Button/Button";
import { RunVariant } from "../../useAgentRunModal";
interface Props {
hasExternalTrigger: boolean;
defaultRunType: RunVariant;
onShowSchedule: () => void;
onRun: () => void;
isExecuting?: boolean;
isSettingUpTrigger?: boolean;
@@ -10,7 +12,9 @@ interface Props {
}
export function RunActions({
hasExternalTrigger,
defaultRunType,
onShowSchedule,
onRun,
isExecuting = false,
isSettingUpTrigger = false,
@@ -18,6 +22,11 @@ export function RunActions({
}: Props) {
return (
<div className="flex justify-end gap-3">
{!hasExternalTrigger && (
<Button variant="secondary" onClick={onShowSchedule}>
Schedule Run
</Button>
)}
<Button
variant="primary"
onClick={onRun}

View File

@@ -1,25 +1,30 @@
import { Button } from "@/components/atoms/Button/Button";
interface Props {
onGoBack: () => void;
onSchedule: () => void;
isCreatingSchedule?: boolean;
allRequiredInputsAreSet?: boolean;
}
export function ScheduleActions({
onGoBack,
onSchedule,
isCreatingSchedule = false,
allRequiredInputsAreSet = true,
}: Props) {
return (
<div className="flex justify-end gap-3">
<Button variant="ghost" onClick={onGoBack}>
Go Back
</Button>
<Button
variant="primary"
onClick={onSchedule}
disabled={!allRequiredInputsAreSet || isCreatingSchedule}
loading={isCreatingSchedule}
>
Schedule Agent
Create Schedule
</Button>
</div>
);

View File

@@ -1,5 +1,7 @@
import { Input } from "@/components/atoms/Input/Input";
import { MultiToggle } from "@/components/molecules/MultiToggle/MultiToggle";
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
import { AgentInputFields } from "../AgentInputFields/AgentInputFields";
import { Text } from "@/components/atoms/Text/Text";
import { Select } from "@/components/atoms/Select/Select";
import { useScheduleView } from "./useScheduleView";
@@ -7,18 +9,24 @@ import { useCallback, useState } from "react";
import { validateSchedule } from "./helpers";
interface Props {
agent: LibraryAgent;
scheduleName: string;
cronExpression: string;
inputValues: Record<string, any>;
onScheduleNameChange: (name: string) => void;
onCronExpressionChange: (expression: string) => void;
onInputChange: (key: string, value: string) => void;
onValidityChange?: (valid: boolean) => void;
}
export function ScheduleView({
agent,
scheduleName,
cronExpression: _cronExpression,
inputValues,
onScheduleNameChange,
onCronExpressionChange,
onInputChange,
onValidityChange,
}: Props) {
const {
@@ -131,7 +139,12 @@ export function ScheduleView({
error={errors.time}
/>
{/** Agent inputs are rendered in the main modal; none here. */}
<AgentInputFields
agent={agent}
inputValues={inputValues}
onInputChange={onInputChange}
variant="schedule"
/>
</div>
);
}

View File

@@ -1,49 +0,0 @@
"use client";
import React, { createContext, useContext } from "react";
import { LibraryAgent } from "@/app/api/__generated__/models/libraryAgent";
import { RunVariant } from "./useAgentRunModal";
export interface RunAgentModalContextValue {
agent: LibraryAgent;
defaultRunType: RunVariant;
// Preset / Trigger
presetName: string;
setPresetName: (value: string) => void;
presetDescription: string;
setPresetDescription: (value: string) => void;
// Inputs
inputValues: Record<string, any>;
setInputValue: (key: string, value: any) => void;
agentInputFields: Record<string, any>;
// Credentials
inputCredentials: Record<string, any>;
setInputCredentialsValue: (key: string, value: any | undefined) => void;
agentCredentialsInputFields: Record<string, any>;
}
const RunAgentModalContext = createContext<RunAgentModalContextValue | null>(
null,
);
export function useRunAgentModalContext(): RunAgentModalContextValue {
const ctx = useContext(RunAgentModalContext);
if (!ctx) throw new Error("RunAgentModalContext missing provider");
return ctx;
}
interface ProviderProps {
value: RunAgentModalContextValue;
children: React.ReactNode;
}
export function RunAgentModalContextProvider({
value,
children,
}: ProviderProps) {
return (
<RunAgentModalContext.Provider value={value}>
{children}
</RunAgentModalContext.Provider>
);
}

View File

@@ -5,7 +5,7 @@ import { isEmpty } from "@/lib/utils";
import { usePostV1ExecuteGraphAgent } from "@/app/api/__generated__/endpoints/graphs/graphs";
import { usePostV1CreateExecutionSchedule as useCreateSchedule } from "@/app/api/__generated__/endpoints/schedules/schedules";
import { usePostV2SetupTrigger } from "@/app/api/__generated__/endpoints/presets/presets";
import { GraphExecutionMeta } from "@/app/api/__generated__/models/graphExecutionMeta";
import { ExecuteGraphResponse } from "@/app/api/__generated__/models/executeGraphResponse";
import { GraphExecutionJobInfo } from "@/app/api/__generated__/models/graphExecutionJobInfo";
import { LibraryAgentPreset } from "@/app/api/__generated__/models/libraryAgentPreset";
@@ -16,7 +16,7 @@ export type RunVariant =
| "manual-trigger";
interface UseAgentRunModalCallbacks {
onRun?: (execution: GraphExecutionMeta) => void;
onRun?: (execution: ExecuteGraphResponse) => void;
onCreateSchedule?: (schedule: GraphExecutionJobInfo) => void;
onSetupTrigger?: (preset: LibraryAgentPreset) => void;
}
@@ -29,11 +29,6 @@ export function useAgentRunModal(
const [isOpen, setIsOpen] = useState(false);
const [showScheduleView, setShowScheduleView] = useState(false);
const [inputValues, setInputValues] = useState<Record<string, any>>({});
const [inputCredentials, setInputCredentials] = useState<Record<string, any>>(
{},
);
const [presetName, setPresetName] = useState<string>("");
const [presetDescription, setPresetDescription] = useState<string>("");
const defaultScheduleName = useMemo(() => `Run ${agent.name}`, [agent.name]);
const [scheduleName, setScheduleName] = useState(defaultScheduleName);
const [cronExpression, setCronExpression] = useState("0 9 * * 1");
@@ -49,7 +44,8 @@ export function useAgentRunModal(
onSuccess: (response) => {
if (response.status === 200) {
toast({
title: "Agent execution started",
title: "Agent execution started",
description: "Your agent is now running.",
});
callbacks?.onRun?.(response.data);
setIsOpen(false);
@@ -70,7 +66,8 @@ export function useAgentRunModal(
onSuccess: (response) => {
if (response.status === 200) {
toast({
title: "Schedule created",
title: "Schedule created",
description: `Agent scheduled to run: ${scheduleName}`,
});
callbacks?.onCreateSchedule?.(response.data);
setIsOpen(false);
@@ -91,7 +88,8 @@ export function useAgentRunModal(
onSuccess: (response: any) => {
if (response.status === 200) {
toast({
title: "Trigger setup complete",
title: "Trigger setup complete",
description: "Your webhook trigger is now active.",
});
callbacks?.onSetupTrigger?.(response.data);
setIsOpen(false);
@@ -130,20 +128,8 @@ export function useAgentRunModal(
);
}, [agentInputSchema]);
const agentCredentialsInputFields = useMemo(() => {
if (
!agent.credentials_input_schema ||
typeof agent.credentials_input_schema !== "object" ||
!("properties" in agent.credentials_input_schema) ||
!agent.credentials_input_schema.properties
) {
return {} as Record<string, any>;
}
return agent.credentials_input_schema.properties as Record<string, any>;
}, [agent.credentials_input_schema]);
// Validation logic
const [allRequiredInputsAreSetRaw, missingInputs] = useMemo(() => {
const [allRequiredInputsAreSet, missingInputs] = useMemo(() => {
const nonEmptyInputs = new Set(
Object.keys(inputValues).filter((k) => !isEmpty(inputValues[k])),
);
@@ -156,41 +142,11 @@ export function useAgentRunModal(
return [missing.length === 0, missing];
}, [agentInputSchema.required, inputValues]);
const [allCredentialsAreSet, missingCredentials] = useMemo(() => {
const availableCredentials = new Set(Object.keys(inputCredentials));
const allCredentials = new Set(
Object.keys(agentCredentialsInputFields || {}) ?? [],
);
const missing = [...allCredentials].filter(
(key) => !availableCredentials.has(key),
);
return [missing.length === 0, missing];
}, [agentCredentialsInputFields, inputCredentials]);
const credentialsRequired = useMemo(
() => Object.keys(agentCredentialsInputFields || {}).length > 0,
[agentCredentialsInputFields],
);
// Final readiness flag combining inputs + credentials when credentials are shown
const allRequiredInputsAreSet = useMemo(
() =>
allRequiredInputsAreSetRaw &&
(!credentialsRequired || allCredentialsAreSet),
[allRequiredInputsAreSetRaw, credentialsRequired, allCredentialsAreSet],
);
const notifyMissingRequirements = useCallback(
const notifyMissingInputs = useCallback(
(needScheduleName: boolean = false) => {
const allMissingFields = (
needScheduleName && !scheduleName ? ["schedule_name"] : []
)
.concat(missingInputs)
.concat(
credentialsRequired && !allCredentialsAreSet
? missingCredentials.map((k) => `credentials:${k}`)
: [],
);
).concat(missingInputs);
toast({
title: "⚠️ Missing required inputs",
@@ -198,20 +154,13 @@ export function useAgentRunModal(
variant: "destructive",
});
},
[
missingInputs,
scheduleName,
toast,
credentialsRequired,
allCredentialsAreSet,
missingCredentials,
],
[missingInputs, scheduleName, toast],
);
// Action handlers
const handleRun = useCallback(() => {
if (!allRequiredInputsAreSet) {
notifyMissingRequirements();
notifyMissingInputs();
return;
}
@@ -228,12 +177,12 @@ export function useAgentRunModal(
setupTriggerMutation.mutate({
data: {
name: presetName || scheduleName,
description: presetDescription || `Trigger for ${agent.name}`,
name: scheduleName,
description: `Trigger for ${agent.name}`,
graph_id: agent.graph_id,
graph_version: agent.graph_version,
trigger_config: inputValues,
agent_credentials: inputCredentials,
agent_credentials: {}, // TODO: Add credentials handling if needed
},
});
} else {
@@ -243,7 +192,7 @@ export function useAgentRunModal(
graphVersion: agent.graph_version,
data: {
inputs: inputValues,
credentials_inputs: inputCredentials,
credentials_inputs: {}, // TODO: Add credentials handling if needed
},
});
}
@@ -252,11 +201,8 @@ export function useAgentRunModal(
defaultRunType,
scheduleName,
inputValues,
inputCredentials,
agent,
presetName,
presetDescription,
notifyMissingRequirements,
notifyMissingInputs,
setupTriggerMutation,
executeGraphMutation,
toast,
@@ -264,7 +210,7 @@ export function useAgentRunModal(
const handleSchedule = useCallback(() => {
if (!allRequiredInputsAreSet) {
notifyMissingRequirements(true);
notifyMissingInputs(true);
return;
}
@@ -280,11 +226,11 @@ export function useAgentRunModal(
createScheduleMutation.mutate({
graphId: agent.graph_id,
data: {
name: presetName || scheduleName,
name: scheduleName,
cron: cronExpression,
inputs: inputValues,
graph_version: agent.graph_version,
credentials: inputCredentials,
credentials: {}, // TODO: Add credentials handling if needed
},
});
}, [
@@ -292,9 +238,8 @@ export function useAgentRunModal(
scheduleName,
cronExpression,
inputValues,
inputCredentials,
agent,
notifyMissingRequirements,
notifyMissingInputs,
createScheduleMutation,
toast,
]);
@@ -326,47 +271,21 @@ export function useAgentRunModal(
}, [agentInputFields]);
return {
// UI state
isOpen,
setIsOpen,
showScheduleView,
// Run mode
defaultRunType,
// Form: regular inputs
inputValues,
setInputValues,
// Form: credentials
inputCredentials,
setInputCredentials,
// Preset/trigger labels
presetName,
presetDescription,
setPresetName,
setPresetDescription,
// Scheduling
scheduleName,
cronExpression,
// Validation/readiness
allRequiredInputsAreSet,
missingInputs,
// Schemas for rendering
agentInputFields,
agentCredentialsInputFields,
hasInputFields,
// Async states
isExecuting: executeGraphMutation.isPending,
isCreatingSchedule: createScheduleMutation.isPending,
isSettingUpTrigger: setupTriggerMutation.isPending,
// Actions
handleRun,
handleSchedule,
handleShowSchedule,

View File

@@ -38,21 +38,16 @@ import {
DialogTitle,
} from "@/components/ui/dialog";
import LoadingBox, { LoadingSpinner } from "@/components/ui/loading";
import {
useToast,
useToastOnFail,
} from "@/components/molecules/Toast/use-toast";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { AgentRunDetailsView } from "./components/agent-run-details-view";
import { AgentRunDraftView } from "./components/agent-run-draft-view";
import { CreatePresetDialog } from "./components/create-preset-dialog";
import { useAgentRunsInfinite } from "./use-agent-runs";
import { useAgentRunsInfinite } from "../use-agent-runs";
import { AgentRunsSelectorList } from "./components/agent-runs-selector-list";
import { AgentScheduleDetailsView } from "./components/agent-schedule-details-view";
export function OldAgentLibraryView() {
const { id: agentID }: { id: LibraryAgentID } = useParams();
const [executionId, setExecutionId] = useQueryState("executionId");
const toastOnFail = useToastOnFail();
const { toast } = useToast();
const router = useRouter();
const api = useBackendAPI();
@@ -90,8 +85,6 @@ export function OldAgentLibraryView() {
incrementRuns,
} = useOnboarding();
const [copyAgentDialogOpen, setCopyAgentDialogOpen] = useState(false);
const [creatingPresetFromExecutionID, setCreatingPresetFromExecutionID] =
useState<GraphExecutionID | null>(null);
// Set page title with agent name
useEffect(() => {
@@ -290,10 +283,6 @@ export function OldAgentLibraryView() {
}
agentRunsQuery.upsertAgentRun(data);
if (data.id === selectedView.id) {
// Update currently viewed run
setSelectedRun(data);
}
},
);
@@ -395,26 +384,6 @@ export function OldAgentLibraryView() {
[schedules, api],
);
const handleCreatePresetFromRun = useCallback(
async (name: string, description: string) => {
if (!creatingPresetFromExecutionID) return;
await api
.createLibraryAgentPreset({
name,
description,
graph_execution_id: creatingPresetFromExecutionID,
})
.then((preset) => {
setAgentPresets((prev) => [...prev, preset]);
selectPreset(preset.id);
setCreatingPresetFromExecutionID(null);
})
.catch(toastOnFail("create a preset"));
},
[api, creatingPresetFromExecutionID, selectPreset, toast],
);
const downloadGraph = useCallback(
async () =>
agent &&
@@ -519,7 +488,6 @@ export function OldAgentLibraryView() {
doDeleteRun={setConfirmingDeleteAgentRun}
doDeletePreset={setConfirmingDeleteAgentPreset}
doDeleteSchedule={deleteSchedule}
doCreatePresetFromRun={setCreatingPresetFromExecutionID}
/>
<div className="flex-1">
@@ -544,16 +512,14 @@ export function OldAgentLibraryView() {
run={selectedRun}
agentActions={agentActions}
onRun={selectRun}
doDeleteRun={() => setConfirmingDeleteAgentRun(selectedRun)}
doCreatePresetFromRun={() =>
setCreatingPresetFromExecutionID(selectedRun.id)
}
deleteRun={() => setConfirmingDeleteAgentRun(selectedRun)}
/>
) : null
) : selectedView.type == "run" ? (
/* Draft new runs / Create new presets */
<AgentRunDraftView
graph={graph}
triggerSetupInfo={agent.trigger_setup_info}
onRun={selectRun}
onCreateSchedule={onCreateSchedule}
onCreatePreset={onCreatePreset}
@@ -563,6 +529,7 @@ export function OldAgentLibraryView() {
/* Edit & update presets */
<AgentRunDraftView
graph={graph}
triggerSetupInfo={agent.trigger_setup_info}
agentPreset={
agentPresets.find((preset) => preset.id == selectedView.id)!
}
@@ -643,11 +610,6 @@ export function OldAgentLibraryView() {
</DialogFooter>
</DialogContent>
</Dialog>
<CreatePresetDialog
open={!!creatingPresetFromExecutionID}
onOpenChange={() => setCreatingPresetFromExecutionID(null)}
onConfirm={handleCreatePresetFromRun}
/>
</div>
</div>
);

Some files were not shown because too many files have changed in this diff Show More