mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-12 15:55:03 -05:00
Compare commits
14 Commits
feat/copit
...
pwuts/spee
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e855806e3 | ||
|
|
adc46ec2e8 | ||
|
|
9d12807294 | ||
|
|
4ac40f11e5 | ||
|
|
f5cdb02a38 | ||
|
|
bd9ff05eaa | ||
|
|
66be27f6da | ||
|
|
39b821da94 | ||
|
|
393d6aa5ac | ||
|
|
f753058e8f | ||
|
|
7cdbbdd65e | ||
|
|
6191ac0b1e | ||
|
|
b51e87bc53 | ||
|
|
71f764f3d0 |
@@ -5,42 +5,13 @@
|
||||
!docs/
|
||||
|
||||
# Platform - Libs
|
||||
!autogpt_platform/autogpt_libs/autogpt_libs/
|
||||
!autogpt_platform/autogpt_libs/pyproject.toml
|
||||
!autogpt_platform/autogpt_libs/poetry.lock
|
||||
!autogpt_platform/autogpt_libs/README.md
|
||||
!autogpt_platform/autogpt_libs/
|
||||
|
||||
# Platform - Backend
|
||||
!autogpt_platform/backend/backend/
|
||||
!autogpt_platform/backend/test/e2e_test_data.py
|
||||
!autogpt_platform/backend/migrations/
|
||||
!autogpt_platform/backend/schema.prisma
|
||||
!autogpt_platform/backend/pyproject.toml
|
||||
!autogpt_platform/backend/poetry.lock
|
||||
!autogpt_platform/backend/README.md
|
||||
!autogpt_platform/backend/.env
|
||||
!autogpt_platform/backend/gen_prisma_types_stub.py
|
||||
|
||||
# Platform - Market
|
||||
!autogpt_platform/market/market/
|
||||
!autogpt_platform/market/scripts.py
|
||||
!autogpt_platform/market/schema.prisma
|
||||
!autogpt_platform/market/pyproject.toml
|
||||
!autogpt_platform/market/poetry.lock
|
||||
!autogpt_platform/market/README.md
|
||||
!autogpt_platform/backend/
|
||||
|
||||
# Platform - Frontend
|
||||
!autogpt_platform/frontend/src/
|
||||
!autogpt_platform/frontend/public/
|
||||
!autogpt_platform/frontend/scripts/
|
||||
!autogpt_platform/frontend/package.json
|
||||
!autogpt_platform/frontend/pnpm-lock.yaml
|
||||
!autogpt_platform/frontend/tsconfig.json
|
||||
!autogpt_platform/frontend/README.md
|
||||
## config
|
||||
!autogpt_platform/frontend/*.config.*
|
||||
!autogpt_platform/frontend/.env.*
|
||||
!autogpt_platform/frontend/.env
|
||||
!autogpt_platform/frontend/
|
||||
|
||||
# Classic - AutoGPT
|
||||
!classic/original_autogpt/autogpt/
|
||||
@@ -64,6 +35,37 @@
|
||||
# Classic - Frontend
|
||||
!classic/frontend/build/web/
|
||||
|
||||
# Explicitly re-ignore some folders
|
||||
.*
|
||||
**/__pycache__
|
||||
# Explicitly re-ignore unwanted files from whitelisted directories
|
||||
# Note: These patterns MUST come after the whitelist rules to take effect
|
||||
|
||||
# Hidden files and directories (but keep frontend .env files needed for build)
|
||||
**/.*
|
||||
!autogpt_platform/frontend/.env
|
||||
!autogpt_platform/frontend/.env.default
|
||||
!autogpt_platform/frontend/.env.production
|
||||
|
||||
# Python artifacts
|
||||
**/__pycache__/
|
||||
**/*.pyc
|
||||
**/*.pyo
|
||||
**/.venv/
|
||||
**/.ruff_cache/
|
||||
**/.pytest_cache/
|
||||
**/.coverage
|
||||
**/htmlcov/
|
||||
|
||||
# Node artifacts
|
||||
**/node_modules/
|
||||
**/.next/
|
||||
**/storybook-static/
|
||||
**/playwright-report/
|
||||
**/test-results/
|
||||
|
||||
# Build artifacts
|
||||
**/dist/
|
||||
**/build/
|
||||
**/target/
|
||||
|
||||
# Logs and temp files
|
||||
**/*.log
|
||||
**/*.tmp
|
||||
|
||||
179
.github/workflows/platform-frontend-ci.yml
vendored
179
.github/workflows/platform-frontend-ci.yml
vendored
@@ -26,7 +26,6 @@ jobs:
|
||||
setup:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
cache-key: ${{ steps.cache-key.outputs.key }}
|
||||
components-changed: ${{ steps.filter.outputs.components }}
|
||||
|
||||
steps:
|
||||
@@ -41,28 +40,17 @@ jobs:
|
||||
components:
|
||||
- 'autogpt_platform/frontend/src/components/**'
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
- name: Enable corepack
|
||||
run: corepack enable
|
||||
|
||||
- name: Generate cache key
|
||||
id: cache-key
|
||||
run: echo "key=${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v5
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
path: ~/.pnpm-store
|
||||
key: ${{ steps.cache-key.outputs.key }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
|
||||
${{ runner.os }}-pnpm-
|
||||
node-version: "22.18.0"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
|
||||
|
||||
- name: Install dependencies
|
||||
- name: Install dependencies to populate cache
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
lint:
|
||||
@@ -73,22 +61,15 @@ jobs:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
- name: Enable corepack
|
||||
run: corepack enable
|
||||
|
||||
- name: Restore dependencies cache
|
||||
uses: actions/cache@v5
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
path: ~/.pnpm-store
|
||||
key: ${{ needs.setup.outputs.cache-key }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
|
||||
${{ runner.os }}-pnpm-
|
||||
node-version: "22.18.0"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
|
||||
|
||||
- name: Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
@@ -111,22 +92,15 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
- name: Enable corepack
|
||||
run: corepack enable
|
||||
|
||||
- name: Restore dependencies cache
|
||||
uses: actions/cache@v5
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
path: ~/.pnpm-store
|
||||
key: ${{ needs.setup.outputs.cache-key }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
|
||||
${{ runner.os }}-pnpm-
|
||||
node-version: "22.18.0"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
|
||||
|
||||
- name: Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
@@ -141,10 +115,8 @@ jobs:
|
||||
exitOnceUploaded: true
|
||||
|
||||
e2e_test:
|
||||
name: end-to-end tests
|
||||
runs-on: big-boi
|
||||
needs: setup
|
||||
strategy:
|
||||
fail-fast: false
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
@@ -152,19 +124,11 @@ jobs:
|
||||
with:
|
||||
submodules: recursive
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
- name: Enable corepack
|
||||
run: corepack enable
|
||||
|
||||
- name: Copy default supabase .env
|
||||
- name: Set up Platform - Copy default supabase .env
|
||||
run: |
|
||||
cp ../.env.default ../.env
|
||||
|
||||
- name: Copy backend .env and set OpenAI API key
|
||||
- name: Set up Platform - Copy backend .env and set OpenAI API key
|
||||
run: |
|
||||
cp ../backend/.env.default ../backend/.env
|
||||
echo "OPENAI_INTERNAL_API_KEY=${{ secrets.OPENAI_API_KEY }}" >> ../backend/.env
|
||||
@@ -172,77 +136,87 @@ jobs:
|
||||
# Used by E2E test data script to generate embeddings for approved store agents
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
- name: Set up Platform - Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Cache Docker layers
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: /tmp/.buildx-cache
|
||||
key: ${{ runner.os }}-buildx-frontend-test-${{ hashFiles('autogpt_platform/docker-compose.yml', 'autogpt_platform/backend/Dockerfile', 'autogpt_platform/backend/pyproject.toml', 'autogpt_platform/backend/poetry.lock') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-buildx-frontend-test-
|
||||
driver: docker-container
|
||||
driver-opts: network=host
|
||||
|
||||
- name: Run docker compose
|
||||
- name: Set up Platform - Expose GHA cache to docker buildx CLI
|
||||
uses: crazy-max/ghaction-github-runtime@v3
|
||||
|
||||
- name: Set up Platform - Build Docker images (with cache)
|
||||
working-directory: autogpt_platform
|
||||
run: |
|
||||
NEXT_PUBLIC_PW_TEST=true docker compose -f ../docker-compose.yml up -d
|
||||
pip install pyyaml
|
||||
|
||||
# Resolve extends and generate a flat compose file that bake can understand
|
||||
docker compose -f docker-compose.yml config > docker-compose.resolved.yml
|
||||
|
||||
# Add cache configuration to the resolved compose file
|
||||
python ../.github/workflows/scripts/docker-ci-fix-compose-build-cache.py \
|
||||
--source docker-compose.resolved.yml \
|
||||
--cache-from "type=gha" \
|
||||
--cache-to "type=gha,mode=max" \
|
||||
--backend-scope "platform-backend-${{ hashFiles('autogpt_platform/backend/Dockerfile', 'autogpt_platform/backend/poetry.lock', 'autogpt_platform/backend/backend') }}" \
|
||||
--frontend-scope "platform-frontend-${{ hashFiles('autogpt_platform/frontend/Dockerfile', 'autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/src') }}"
|
||||
|
||||
# Build with bake using the resolved compose file (now includes cache config)
|
||||
docker buildx bake --allow=fs.read=.. -f docker-compose.resolved.yml --load
|
||||
env:
|
||||
DOCKER_BUILDKIT: 1
|
||||
BUILDX_CACHE_FROM: type=local,src=/tmp/.buildx-cache
|
||||
BUILDX_CACHE_TO: type=local,dest=/tmp/.buildx-cache-new,mode=max
|
||||
NEXT_PUBLIC_PW_TEST: true
|
||||
|
||||
- name: Move cache
|
||||
run: |
|
||||
rm -rf /tmp/.buildx-cache
|
||||
if [ -d "/tmp/.buildx-cache-new" ]; then
|
||||
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
|
||||
fi
|
||||
- name: Set up Platform - Run (docker compose up)
|
||||
run: docker compose -f ../docker-compose.resolved.yml up -d --no-build
|
||||
env:
|
||||
NEXT_PUBLIC_PW_TEST: true
|
||||
|
||||
- name: Wait for services to be ready
|
||||
- name: Set up Platform - Wait for services to be ready
|
||||
run: |
|
||||
echo "Waiting for rest_server to be ready..."
|
||||
timeout 60 sh -c 'until curl -f http://localhost:8006/health 2>/dev/null; do sleep 2; done' || echo "Rest server health check timeout, continuing..."
|
||||
echo "Waiting for database to be ready..."
|
||||
timeout 60 sh -c 'until docker compose -f ../docker-compose.yml exec -T db pg_isready -U postgres 2>/dev/null; do sleep 2; done' || echo "Database ready check timeout, continuing..."
|
||||
timeout 60 sh -c 'until docker compose -f ../docker-compose.resolved.yml exec -T db pg_isready -U postgres 2>/dev/null; do sleep 2; done' || echo "Database ready check timeout, continuing..."
|
||||
|
||||
- name: Create E2E test data
|
||||
- name: Set up tests - Create E2E test data
|
||||
run: |
|
||||
echo "Creating E2E test data..."
|
||||
# First try to run the script from inside the container
|
||||
if docker compose -f ../docker-compose.yml exec -T rest_server test -f /app/autogpt_platform/backend/test/e2e_test_data.py; then
|
||||
if docker compose -f ../docker-compose.resolved.yml exec -T rest_server test -f /app/autogpt_platform/backend/test/e2e_test_data.py; then
|
||||
echo "✅ Found e2e_test_data.py in container, running it..."
|
||||
docker compose -f ../docker-compose.yml exec -T rest_server sh -c "cd /app/autogpt_platform && python backend/test/e2e_test_data.py" || {
|
||||
docker compose -f ../docker-compose.resolved.yml exec -T rest_server sh -c "cd /app/autogpt_platform && python backend/test/e2e_test_data.py" || {
|
||||
echo "❌ E2E test data creation failed!"
|
||||
docker compose -f ../docker-compose.yml logs --tail=50 rest_server
|
||||
docker compose -f ../docker-compose.resolved.yml logs --tail=50 rest_server
|
||||
exit 1
|
||||
}
|
||||
else
|
||||
echo "⚠️ e2e_test_data.py not found in container, copying and running..."
|
||||
# Copy the script into the container and run it
|
||||
docker cp ../backend/test/e2e_test_data.py $(docker compose -f ../docker-compose.yml ps -q rest_server):/tmp/e2e_test_data.py || {
|
||||
docker cp ../backend/test/e2e_test_data.py $(docker compose -f ../docker-compose.resolved.yml ps -q rest_server):/tmp/e2e_test_data.py || {
|
||||
echo "❌ Failed to copy script to container"
|
||||
exit 1
|
||||
}
|
||||
docker compose -f ../docker-compose.yml exec -T rest_server sh -c "cd /app/autogpt_platform && python /tmp/e2e_test_data.py" || {
|
||||
docker compose -f ../docker-compose.resolved.yml exec -T rest_server sh -c "cd /app/autogpt_platform && python /tmp/e2e_test_data.py" || {
|
||||
echo "❌ E2E test data creation failed!"
|
||||
docker compose -f ../docker-compose.yml logs --tail=50 rest_server
|
||||
docker compose -f ../docker-compose.resolved.yml logs --tail=50 rest_server
|
||||
exit 1
|
||||
}
|
||||
fi
|
||||
|
||||
- name: Restore dependencies cache
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: ~/.pnpm-store
|
||||
key: ${{ needs.setup.outputs.cache-key }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
|
||||
${{ runner.os }}-pnpm-
|
||||
- name: Set up tests - Enable corepack
|
||||
run: corepack enable
|
||||
|
||||
- name: Install dependencies
|
||||
- name: Set up tests - Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
|
||||
|
||||
- name: Set up tests - Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
- name: Install Browser 'chromium'
|
||||
- name: Set up tests - Install browser 'chromium'
|
||||
run: pnpm playwright install --with-deps chromium
|
||||
|
||||
- name: Run Playwright tests
|
||||
@@ -269,7 +243,7 @@ jobs:
|
||||
|
||||
- name: Print Final Docker Compose logs
|
||||
if: always()
|
||||
run: docker compose -f ../docker-compose.yml logs
|
||||
run: docker compose -f ../docker-compose.resolved.yml logs
|
||||
|
||||
integration_test:
|
||||
runs-on: ubuntu-latest
|
||||
@@ -281,22 +255,15 @@ jobs:
|
||||
with:
|
||||
submodules: recursive
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
- name: Enable corepack
|
||||
run: corepack enable
|
||||
|
||||
- name: Restore dependencies cache
|
||||
uses: actions/cache@v5
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
path: ~/.pnpm-store
|
||||
key: ${{ needs.setup.outputs.cache-key }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
|
||||
${{ runner.os }}-pnpm-
|
||||
node-version: "22.18.0"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
|
||||
|
||||
- name: Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
154
.github/workflows/scripts/docker-ci-fix-compose-build-cache.py
vendored
Normal file
154
.github/workflows/scripts/docker-ci-fix-compose-build-cache.py
vendored
Normal file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Add cache configuration to a resolved docker-compose file for all services
|
||||
that have a build key, and ensure image names match what docker compose expects.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Add cache config to a resolved compose file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source",
|
||||
required=True,
|
||||
help="Source compose file to read (should be output of `docker compose config`)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cache-from",
|
||||
default="type=gha",
|
||||
help="Cache source configuration",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cache-to",
|
||||
default="type=gha,mode=max",
|
||||
help="Cache destination configuration",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--backend-scope",
|
||||
default="",
|
||||
help="GHA cache scope for backend services (e.g., platform-backend-{hash})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--frontend-scope",
|
||||
default="",
|
||||
help="GHA cache scope for frontend service (e.g., platform-frontend-{hash})",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.source, "r") as f:
|
||||
compose = yaml.safe_load(f)
|
||||
|
||||
# Get project name from compose file or default
|
||||
project_name = compose.get("name", "autogpt_platform")
|
||||
|
||||
def get_image_name(dockerfile: str, target: str) -> str:
|
||||
"""Generate image name based on Dockerfile folder and build target."""
|
||||
dockerfile_parts = dockerfile.replace("\\", "/").split("/")
|
||||
if len(dockerfile_parts) >= 2:
|
||||
folder_name = dockerfile_parts[-2] # e.g., "backend" or "frontend"
|
||||
else:
|
||||
folder_name = "app"
|
||||
return f"{project_name}-{folder_name}:{target}"
|
||||
|
||||
def get_build_key(dockerfile: str, target: str) -> str:
|
||||
"""Generate a unique key for a Dockerfile+target combination."""
|
||||
return f"{dockerfile}:{target}"
|
||||
|
||||
# First pass: collect all services with build configs and identify duplicates
|
||||
# Track which (dockerfile, target) combinations we've seen
|
||||
build_key_to_first_service: dict[str, str] = {}
|
||||
services_to_build: list[str] = []
|
||||
services_to_dedupe: list[str] = []
|
||||
|
||||
for service_name, service_config in compose.get("services", {}).items():
|
||||
if "build" not in service_config:
|
||||
continue
|
||||
|
||||
build_config = service_config["build"]
|
||||
dockerfile = build_config.get("dockerfile", "Dockerfile")
|
||||
target = build_config.get("target", "default")
|
||||
build_key = get_build_key(dockerfile, target)
|
||||
|
||||
if build_key not in build_key_to_first_service:
|
||||
# First service with this build config - it will do the actual build
|
||||
build_key_to_first_service[build_key] = service_name
|
||||
services_to_build.append(service_name)
|
||||
else:
|
||||
# Duplicate - will just use the image from the first service
|
||||
services_to_dedupe.append(service_name)
|
||||
|
||||
# Second pass: configure builds and deduplicate
|
||||
modified_services = []
|
||||
for service_name, service_config in compose.get("services", {}).items():
|
||||
if "build" not in service_config:
|
||||
continue
|
||||
|
||||
build_config = service_config["build"]
|
||||
dockerfile = build_config.get("dockerfile", "Dockerfile")
|
||||
target = build_config.get("target", "latest")
|
||||
image_name = get_image_name(dockerfile, target)
|
||||
|
||||
# Set image name for all services (needed for both builders and deduped)
|
||||
service_config["image"] = image_name
|
||||
|
||||
if service_name in services_to_dedupe:
|
||||
# Remove build config - this service will use the pre-built image
|
||||
del service_config["build"]
|
||||
continue
|
||||
|
||||
# This service will do the actual build - add cache config
|
||||
cache_from = args.cache_from
|
||||
cache_to = args.cache_to
|
||||
|
||||
# Determine scope based on Dockerfile path and target
|
||||
# Each unique (dockerfile, target) combination gets its own cache scope
|
||||
if "type=gha" in args.cache_from or "type=gha" in args.cache_to:
|
||||
if "frontend" in dockerfile:
|
||||
base_scope = args.frontend_scope
|
||||
elif "backend" in dockerfile:
|
||||
base_scope = args.backend_scope
|
||||
else:
|
||||
# Skip services that don't clearly match frontend/backend
|
||||
continue
|
||||
|
||||
if base_scope:
|
||||
# Append target to scope to differentiate e.g. migrate vs server
|
||||
scope = f"{base_scope}-{target}"
|
||||
if "type=gha" in args.cache_from:
|
||||
cache_from = f"{args.cache_from},scope={scope}"
|
||||
if "type=gha" in args.cache_to:
|
||||
cache_to = f"{args.cache_to},scope={scope}"
|
||||
|
||||
build_config["cache_from"] = [cache_from]
|
||||
build_config["cache_to"] = [cache_to]
|
||||
modified_services.append(service_name)
|
||||
|
||||
# Write back to the same file
|
||||
with open(args.source, "w") as f:
|
||||
yaml.dump(compose, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
print(f"Added cache config to {len(modified_services)} services in {args.source}:")
|
||||
for svc in modified_services:
|
||||
svc_config = compose["services"][svc]
|
||||
build_cfg = svc_config.get("build", {})
|
||||
cache_from_val = build_cfg.get("cache_from", ["none"])[0]
|
||||
cache_to_val = build_cfg.get("cache_to", ["none"])[0]
|
||||
print(f" - {svc}")
|
||||
print(f" image: {svc_config.get('image', 'N/A')}")
|
||||
print(f" cache_from: {cache_from_val}")
|
||||
print(f" cache_to: {cache_to_val}")
|
||||
if services_to_dedupe:
|
||||
print(
|
||||
f"Deduplicated {len(services_to_dedupe)} services (will use pre-built images):"
|
||||
)
|
||||
for svc in services_to_dedupe:
|
||||
print(f" - {svc} -> {compose['services'][svc].get('image', 'N/A')}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,3 +1,5 @@
|
||||
# ============================ DEPENDENCY BUILDER ============================ #
|
||||
|
||||
FROM debian:13-slim AS builder
|
||||
|
||||
# Set environment variables
|
||||
@@ -51,7 +53,9 @@ COPY autogpt_platform/backend/backend/data/partial_types.py ./backend/data/parti
|
||||
COPY autogpt_platform/backend/gen_prisma_types_stub.py ./
|
||||
RUN poetry run prisma generate && poetry run gen-prisma-stub
|
||||
|
||||
FROM debian:13-slim AS server_dependencies
|
||||
# ============================== BACKEND SERVER ============================== #
|
||||
|
||||
FROM debian:13-slim AS server
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
@@ -62,22 +66,15 @@ ENV POETRY_HOME=/opt/poetry \
|
||||
DEBIAN_FRONTEND=noninteractive
|
||||
ENV PATH=/opt/poetry/bin:$PATH
|
||||
|
||||
# Install Python, FFmpeg, ImageMagick, and CLI tools for agent use.
|
||||
# bubblewrap provides OS-level sandbox (whitelist-only FS + no network)
|
||||
# for the bash_exec MCP tool.
|
||||
RUN apt-get update && apt-get install -y \
|
||||
# Install Python, FFmpeg, and ImageMagick (required for video processing blocks)
|
||||
# Using --no-install-recommends saves ~650MB by skipping unnecessary deps like llvm, mesa, etc.
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
python3.13 \
|
||||
python3-pip \
|
||||
ffmpeg \
|
||||
imagemagick \
|
||||
jq \
|
||||
ripgrep \
|
||||
tree \
|
||||
bubblewrap \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy only necessary files from builder
|
||||
COPY --from=builder /app /app
|
||||
COPY --from=builder /usr/local/lib/python3* /usr/local/lib/python3*
|
||||
COPY --from=builder /usr/local/bin/poetry /usr/local/bin/poetry
|
||||
# Copy Node.js installation for Prisma
|
||||
@@ -87,30 +84,54 @@ COPY --from=builder /usr/bin/npm /usr/bin/npm
|
||||
COPY --from=builder /usr/bin/npx /usr/bin/npx
|
||||
COPY --from=builder /root/.cache/prisma-python/binaries /root/.cache/prisma-python/binaries
|
||||
|
||||
ENV PATH="/app/autogpt_platform/backend/.venv/bin:$PATH"
|
||||
|
||||
RUN mkdir -p /app/autogpt_platform/autogpt_libs
|
||||
RUN mkdir -p /app/autogpt_platform/backend
|
||||
|
||||
COPY autogpt_platform/autogpt_libs /app/autogpt_platform/autogpt_libs
|
||||
|
||||
COPY autogpt_platform/backend/poetry.lock autogpt_platform/backend/pyproject.toml /app/autogpt_platform/backend/
|
||||
|
||||
WORKDIR /app/autogpt_platform/backend
|
||||
|
||||
FROM server_dependencies AS migrate
|
||||
# Copy only the .venv from builder (not the entire /app directory)
|
||||
# The .venv includes the generated Prisma client
|
||||
COPY --from=builder /app/autogpt_platform/backend/.venv ./.venv
|
||||
ENV PATH="/app/autogpt_platform/backend/.venv/bin:$PATH"
|
||||
|
||||
# Migration stage only needs schema and migrations - much lighter than full backend
|
||||
COPY autogpt_platform/backend/schema.prisma /app/autogpt_platform/backend/
|
||||
COPY autogpt_platform/backend/backend/data/partial_types.py /app/autogpt_platform/backend/backend/data/partial_types.py
|
||||
COPY autogpt_platform/backend/migrations /app/autogpt_platform/backend/migrations
|
||||
# Copy dependency files + autogpt_libs (path dependency)
|
||||
COPY autogpt_platform/autogpt_libs /app/autogpt_platform/autogpt_libs
|
||||
COPY autogpt_platform/backend/poetry.lock autogpt_platform/backend/pyproject.toml ./
|
||||
|
||||
FROM server_dependencies AS server
|
||||
|
||||
COPY autogpt_platform/backend /app/autogpt_platform/backend
|
||||
# Copy backend code + docs (for Copilot docs search)
|
||||
COPY autogpt_platform/backend ./
|
||||
COPY docs /app/docs
|
||||
RUN poetry install --no-ansi --only-root
|
||||
|
||||
ENV PORT=8000
|
||||
|
||||
CMD ["poetry", "run", "rest"]
|
||||
|
||||
# =============================== DB MIGRATOR =============================== #
|
||||
|
||||
# Lightweight migrate stage - only needs Prisma CLI, not full Python environment
|
||||
FROM debian:13-slim AS migrate
|
||||
|
||||
WORKDIR /app/autogpt_platform/backend
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
# Install only what's needed for prisma migrate: Node.js and minimal Python for prisma-python
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
python3.13 \
|
||||
python3-pip \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy Node.js from builder (needed for Prisma CLI)
|
||||
COPY --from=builder /usr/bin/node /usr/bin/node
|
||||
COPY --from=builder /usr/lib/node_modules /usr/lib/node_modules
|
||||
COPY --from=builder /usr/bin/npm /usr/bin/npm
|
||||
|
||||
# Copy Prisma binaries
|
||||
COPY --from=builder /root/.cache/prisma-python/binaries /root/.cache/prisma-python/binaries
|
||||
|
||||
# Install prisma-client-py directly (much smaller than copying full venv)
|
||||
RUN pip3 install prisma>=0.15.0 --break-system-packages
|
||||
|
||||
COPY autogpt_platform/backend/schema.prisma ./
|
||||
COPY autogpt_platform/backend/backend/data/partial_types.py ./backend/data/partial_types.py
|
||||
COPY autogpt_platform/backend/gen_prisma_types_stub.py ./
|
||||
COPY autogpt_platform/backend/migrations ./migrations
|
||||
|
||||
@@ -10,7 +10,7 @@ from typing_extensions import TypedDict
|
||||
|
||||
import backend.api.features.store.cache as store_cache
|
||||
import backend.api.features.store.model as store_model
|
||||
import backend.blocks
|
||||
import backend.data.block
|
||||
from backend.api.external.middleware import require_permission
|
||||
from backend.data import execution as execution_db
|
||||
from backend.data import graph as graph_db
|
||||
@@ -67,7 +67,7 @@ async def get_user_info(
|
||||
dependencies=[Security(require_permission(APIKeyPermission.READ_BLOCK))],
|
||||
)
|
||||
async def get_graph_blocks() -> Sequence[dict[Any, Any]]:
|
||||
blocks = [block() for block in backend.blocks.get_blocks().values()]
|
||||
blocks = [block() for block in backend.data.block.get_blocks().values()]
|
||||
return [b.to_dict() for b in blocks if not b.disabled]
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ async def execute_graph_block(
|
||||
require_permission(APIKeyPermission.EXECUTE_BLOCK)
|
||||
),
|
||||
) -> CompletedBlockOutput:
|
||||
obj = backend.blocks.get_block(block_id)
|
||||
obj = backend.data.block.get_block(block_id)
|
||||
if not obj:
|
||||
raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.")
|
||||
if obj.disabled:
|
||||
|
||||
@@ -10,15 +10,10 @@ import backend.api.features.library.db as library_db
|
||||
import backend.api.features.library.model as library_model
|
||||
import backend.api.features.store.db as store_db
|
||||
import backend.api.features.store.model as store_model
|
||||
import backend.data.block
|
||||
from backend.blocks import load_all_blocks
|
||||
from backend.blocks._base import (
|
||||
AnyBlockSchema,
|
||||
BlockCategory,
|
||||
BlockInfo,
|
||||
BlockSchema,
|
||||
BlockType,
|
||||
)
|
||||
from backend.blocks.llm import LlmModel
|
||||
from backend.data.block import AnyBlockSchema, BlockCategory, BlockInfo, BlockSchema
|
||||
from backend.data.db import query_raw_with_schema
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util.cache import cached
|
||||
@@ -27,7 +22,7 @@ from backend.util.models import Pagination
|
||||
from .model import (
|
||||
BlockCategoryResponse,
|
||||
BlockResponse,
|
||||
BlockTypeFilter,
|
||||
BlockType,
|
||||
CountResponse,
|
||||
FilterType,
|
||||
Provider,
|
||||
@@ -93,7 +88,7 @@ def get_block_categories(category_blocks: int = 3) -> list[BlockCategoryResponse
|
||||
def get_blocks(
|
||||
*,
|
||||
category: str | None = None,
|
||||
type: BlockTypeFilter | None = None,
|
||||
type: BlockType | None = None,
|
||||
provider: ProviderName | None = None,
|
||||
page: int = 1,
|
||||
page_size: int = 50,
|
||||
@@ -674,9 +669,9 @@ async def get_suggested_blocks(count: int = 5) -> list[BlockInfo]:
|
||||
for block_type in load_all_blocks().values():
|
||||
block: AnyBlockSchema = block_type()
|
||||
if block.disabled or block.block_type in (
|
||||
BlockType.INPUT,
|
||||
BlockType.OUTPUT,
|
||||
BlockType.AGENT,
|
||||
backend.data.block.BlockType.INPUT,
|
||||
backend.data.block.BlockType.OUTPUT,
|
||||
backend.data.block.BlockType.AGENT,
|
||||
):
|
||||
continue
|
||||
# Find the execution count for this block
|
||||
|
||||
@@ -4,7 +4,7 @@ from pydantic import BaseModel
|
||||
|
||||
import backend.api.features.library.model as library_model
|
||||
import backend.api.features.store.model as store_model
|
||||
from backend.blocks._base import BlockInfo
|
||||
from backend.data.block import BlockInfo
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util.models import Pagination
|
||||
|
||||
@@ -15,7 +15,7 @@ FilterType = Literal[
|
||||
"my_agents",
|
||||
]
|
||||
|
||||
BlockTypeFilter = Literal["all", "input", "action", "output"]
|
||||
BlockType = Literal["all", "input", "action", "output"]
|
||||
|
||||
|
||||
class SearchEntry(BaseModel):
|
||||
|
||||
@@ -88,7 +88,7 @@ async def get_block_categories(
|
||||
)
|
||||
async def get_blocks(
|
||||
category: Annotated[str | None, fastapi.Query()] = None,
|
||||
type: Annotated[builder_model.BlockTypeFilter | None, fastapi.Query()] = None,
|
||||
type: Annotated[builder_model.BlockType | None, fastapi.Query()] = None,
|
||||
provider: Annotated[ProviderName | None, fastapi.Query()] = None,
|
||||
page: Annotated[int, fastapi.Query()] = 1,
|
||||
page_size: Annotated[int, fastapi.Query()] = 50,
|
||||
|
||||
@@ -27,11 +27,12 @@ class ChatConfig(BaseSettings):
|
||||
session_ttl: int = Field(default=43200, description="Session TTL in seconds")
|
||||
|
||||
# Streaming Configuration
|
||||
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
|
||||
max_retries: int = Field(
|
||||
default=3,
|
||||
description="Max retries for fallback path (SDK handles retries internally)",
|
||||
max_context_messages: int = Field(
|
||||
default=50, ge=1, le=200, description="Maximum context messages"
|
||||
)
|
||||
|
||||
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
|
||||
max_retries: int = Field(default=3, description="Maximum number of retries")
|
||||
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
|
||||
max_agent_schedules: int = Field(
|
||||
default=30, description="Maximum number of agent schedules"
|
||||
@@ -92,26 +93,6 @@ class ChatConfig(BaseSettings):
|
||||
description="Name of the prompt in Langfuse to fetch",
|
||||
)
|
||||
|
||||
# Claude Agent SDK Configuration
|
||||
use_claude_agent_sdk: bool = Field(
|
||||
default=True,
|
||||
description="Use Claude Agent SDK for chat completions",
|
||||
)
|
||||
claude_agent_model: str | None = Field(
|
||||
default=None,
|
||||
description="Model for the Claude Agent SDK path. If None, derives from "
|
||||
"the `model` field by stripping the OpenRouter provider prefix.",
|
||||
)
|
||||
claude_agent_max_buffer_size: int = Field(
|
||||
default=10 * 1024 * 1024, # 10MB (default SDK is 1MB)
|
||||
description="Max buffer size in bytes for Claude Agent SDK JSON message parsing. "
|
||||
"Increase if tool outputs exceed the limit.",
|
||||
)
|
||||
claude_agent_max_subtasks: int = Field(
|
||||
default=10,
|
||||
description="Max number of sub-agent Tasks the SDK can spawn per session.",
|
||||
)
|
||||
|
||||
# Extended thinking configuration for Claude models
|
||||
thinking_enabled: bool = Field(
|
||||
default=True,
|
||||
@@ -157,17 +138,6 @@ class ChatConfig(BaseSettings):
|
||||
v = os.getenv("CHAT_INTERNAL_API_KEY")
|
||||
return v
|
||||
|
||||
@field_validator("use_claude_agent_sdk", mode="before")
|
||||
@classmethod
|
||||
def get_use_claude_agent_sdk(cls, v):
|
||||
"""Get use_claude_agent_sdk from environment if not provided."""
|
||||
# Check environment variable - default to True if not set
|
||||
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
|
||||
if env_val:
|
||||
return env_val in ("true", "1", "yes", "on")
|
||||
# Default to True (SDK enabled by default)
|
||||
return True if v is None else v
|
||||
|
||||
# Prompt paths for different contexts
|
||||
PROMPT_PATHS: dict[str, str] = {
|
||||
"default": "prompts/chat_system.md",
|
||||
|
||||
@@ -334,8 +334,9 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
|
||||
try:
|
||||
session = ChatSession.model_validate_json(raw_session)
|
||||
logger.info(
|
||||
f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, "
|
||||
f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles
|
||||
f"Loading session {session_id} from cache: "
|
||||
f"message_count={len(session.messages)}, "
|
||||
f"roles={[m.role for m in session.messages]}"
|
||||
)
|
||||
return session
|
||||
except Exception as e:
|
||||
@@ -377,9 +378,11 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
|
||||
return None
|
||||
|
||||
messages = prisma_session.Messages
|
||||
logger.debug(
|
||||
f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, "
|
||||
f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles
|
||||
logger.info(
|
||||
f"Loading session {session_id} from DB: "
|
||||
f"has_messages={messages is not None}, "
|
||||
f"message_count={len(messages) if messages else 0}, "
|
||||
f"roles={[m.role for m in messages] if messages else []}"
|
||||
)
|
||||
|
||||
return ChatSession.from_db(prisma_session, messages)
|
||||
@@ -430,9 +433,10 @@ async def _save_session_to_db(
|
||||
"function_call": msg.function_call,
|
||||
}
|
||||
)
|
||||
logger.debug(
|
||||
f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, "
|
||||
f"roles={[m['role'] for m in messages_data]}"
|
||||
logger.info(
|
||||
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: "
|
||||
f"roles={[m['role'] for m in messages_data]}, "
|
||||
f"start_sequence={existing_message_count}"
|
||||
)
|
||||
await chat_db.add_chat_messages_batch(
|
||||
session_id=session.session_id,
|
||||
@@ -472,7 +476,7 @@ async def get_chat_session(
|
||||
logger.warning(f"Unexpected cache error for session {session_id}: {e}")
|
||||
|
||||
# Fall back to database
|
||||
logger.debug(f"Session {session_id} not in cache, checking database")
|
||||
logger.info(f"Session {session_id} not in cache, checking database")
|
||||
session = await _get_session_from_db(session_id)
|
||||
|
||||
if session is None:
|
||||
@@ -489,6 +493,7 @@ async def get_chat_session(
|
||||
# Cache the session from DB
|
||||
try:
|
||||
await _cache_session(session)
|
||||
logger.info(f"Cached session {session_id} from database")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cache session {session_id}: {e}")
|
||||
|
||||
@@ -553,40 +558,6 @@ async def upsert_chat_session(
|
||||
return session
|
||||
|
||||
|
||||
async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession:
|
||||
"""Atomically append a message to a session and persist it.
|
||||
|
||||
Acquires the session lock, re-fetches the latest session state,
|
||||
appends the message, and saves — preventing message loss when
|
||||
concurrent requests modify the same session.
|
||||
"""
|
||||
lock = await _get_session_lock(session_id)
|
||||
|
||||
async with lock:
|
||||
session = await get_chat_session(session_id)
|
||||
if session is None:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
|
||||
session.messages.append(message)
|
||||
existing_message_count = await chat_db.get_chat_session_message_count(
|
||||
session_id
|
||||
)
|
||||
|
||||
try:
|
||||
await _save_session_to_db(session, existing_message_count)
|
||||
except Exception as e:
|
||||
raise DatabaseError(
|
||||
f"Failed to persist message to session {session_id}"
|
||||
) from e
|
||||
|
||||
try:
|
||||
await _cache_session(session)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache write failed for session {session_id}: {e}")
|
||||
|
||||
return session
|
||||
|
||||
|
||||
async def create_chat_session(user_id: str) -> ChatSession:
|
||||
"""Create a new chat session and persist it.
|
||||
|
||||
@@ -693,19 +664,13 @@ async def update_session_title(session_id: str, title: str) -> bool:
|
||||
logger.warning(f"Session {session_id} not found for title update")
|
||||
return False
|
||||
|
||||
# Update title in cache if it exists (instead of invalidating).
|
||||
# This prevents race conditions where cache invalidation causes
|
||||
# the frontend to see stale DB data while streaming is still in progress.
|
||||
# Invalidate cache so next fetch gets updated title
|
||||
try:
|
||||
cached = await _get_session_from_cache(session_id)
|
||||
if cached:
|
||||
cached.title = title
|
||||
await _cache_session(cached)
|
||||
redis_key = _get_session_cache_key(session_id)
|
||||
async_redis = await get_redis_async()
|
||||
await async_redis.delete(redis_key)
|
||||
except Exception as e:
|
||||
# Not critical - title will be correct on next full cache refresh
|
||||
logger.warning(
|
||||
f"Failed to update title in cache for session {session_id}: {e}"
|
||||
)
|
||||
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Chat API routes for chat session management and streaming via SSE."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid as uuid_module
|
||||
from collections.abc import AsyncGenerator
|
||||
@@ -12,22 +11,13 @@ from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.util.exceptions import NotFoundError
|
||||
from backend.util.feature_flag import Flag, is_feature_enabled
|
||||
|
||||
from . import service as chat_service
|
||||
from . import stream_registry
|
||||
from .completion_handler import process_operation_failure, process_operation_success
|
||||
from .config import ChatConfig
|
||||
from .model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
append_and_save_message,
|
||||
create_chat_session,
|
||||
get_chat_session,
|
||||
get_user_sessions,
|
||||
)
|
||||
from .response_model import StreamError, StreamFinish, StreamHeartbeat, StreamStart
|
||||
from .sdk import service as sdk_service
|
||||
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
|
||||
from .response_model import StreamFinish, StreamHeartbeat
|
||||
from .tools.models import (
|
||||
AgentDetailsResponse,
|
||||
AgentOutputResponse,
|
||||
@@ -50,7 +40,6 @@ from .tools.models import (
|
||||
SetupRequirementsResponse,
|
||||
UnderstandingUpdatedResponse,
|
||||
)
|
||||
from .tracking import track_user_message
|
||||
|
||||
config = ChatConfig()
|
||||
|
||||
@@ -242,10 +231,6 @@ async def get_session(
|
||||
active_task, last_message_id = await stream_registry.get_active_task_for_session(
|
||||
session_id, user_id
|
||||
)
|
||||
logger.info(
|
||||
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
|
||||
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
|
||||
)
|
||||
if active_task:
|
||||
# Filter out the in-progress assistant message from the session response.
|
||||
# The client will receive the complete assistant response through the SSE
|
||||
@@ -315,9 +300,10 @@ async def stream_chat_post(
|
||||
f"user={user_id}, message_len={len(request.message)}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
logger.info(
|
||||
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time) * 1000:.1f}ms",
|
||||
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
@@ -326,25 +312,6 @@ async def stream_chat_post(
|
||||
},
|
||||
)
|
||||
|
||||
# Atomically append user message to session BEFORE creating task to avoid
|
||||
# race condition where GET_SESSION sees task as "running" but message isn't
|
||||
# saved yet. append_and_save_message re-fetches inside a lock to prevent
|
||||
# message loss from concurrent requests.
|
||||
if request.message:
|
||||
message = ChatMessage(
|
||||
role="user" if request.is_user_message else "assistant",
|
||||
content=request.message,
|
||||
)
|
||||
if request.is_user_message:
|
||||
track_user_message(
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
message_length=len(request.message),
|
||||
)
|
||||
logger.info(f"[STREAM] Saving user message to session {session_id}")
|
||||
session = await append_and_save_message(session_id, message)
|
||||
logger.info(f"[STREAM] User message saved for session {session_id}")
|
||||
|
||||
# Create a task in the stream registry for reconnection support
|
||||
task_id = str(uuid_module.uuid4())
|
||||
operation_id = str(uuid_module.uuid4())
|
||||
@@ -360,7 +327,7 @@ async def stream_chat_post(
|
||||
operation_id=operation_id,
|
||||
)
|
||||
logger.info(
|
||||
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start) * 1000:.1f}ms",
|
||||
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
@@ -381,47 +348,15 @@ async def stream_chat_post(
|
||||
first_chunk_time, ttfc = None, None
|
||||
chunk_count = 0
|
||||
try:
|
||||
# Emit a start event with task_id for reconnection
|
||||
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
|
||||
await stream_registry.publish_chunk(task_id, start_chunk)
|
||||
logger.info(
|
||||
f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time) * 1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": (time_module.perf_counter() - gen_start_time)
|
||||
* 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Choose service based on LaunchDarkly flag (falls back to config default)
|
||||
use_sdk = await is_feature_enabled(
|
||||
Flag.COPILOT_SDK,
|
||||
user_id or "anonymous",
|
||||
default=config.use_claude_agent_sdk,
|
||||
)
|
||||
stream_fn = (
|
||||
sdk_service.stream_chat_completion_sdk
|
||||
if use_sdk
|
||||
else chat_service.stream_chat_completion
|
||||
)
|
||||
logger.info(
|
||||
f"[TIMING] Calling {'sdk' if use_sdk else 'standard'} stream_chat_completion",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
# Pass message=None since we already added it to the session above
|
||||
async for chunk in stream_fn(
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
session_id,
|
||||
None, # Message already in session
|
||||
request.message,
|
||||
is_user_message=request.is_user_message,
|
||||
user_id=user_id,
|
||||
session=session, # Pass session with message already added
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
context=request.context,
|
||||
_task_id=task_id, # Pass task_id so service emits start with taskId for reconnection
|
||||
):
|
||||
# Skip duplicate StreamStart — we already published one above
|
||||
if isinstance(chunk, StreamStart):
|
||||
continue
|
||||
chunk_count += 1
|
||||
if first_chunk_time is None:
|
||||
first_chunk_time = time_module.perf_counter()
|
||||
@@ -442,7 +377,7 @@ async def stream_chat_post(
|
||||
gen_end_time = time_module.perf_counter()
|
||||
total_time = (gen_end_time - gen_start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] run_ai_generation FINISHED in {total_time / 1000:.1f}s; "
|
||||
f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; "
|
||||
f"task={task_id}, session={session_id}, "
|
||||
f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}",
|
||||
extra={
|
||||
@@ -469,17 +404,6 @@ async def stream_chat_post(
|
||||
}
|
||||
},
|
||||
)
|
||||
# Publish a StreamError so the frontend can display an error message
|
||||
try:
|
||||
await stream_registry.publish_chunk(
|
||||
task_id,
|
||||
StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="stream_error",
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass # Best-effort; mark_task_completed will publish StreamFinish
|
||||
await stream_registry.mark_task_completed(task_id, "failed")
|
||||
|
||||
# Start the AI generation in a background task
|
||||
@@ -582,14 +506,8 @@ async def stream_chat_post(
|
||||
"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}
|
||||
},
|
||||
)
|
||||
# Surface error to frontend so it doesn't appear stuck
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="stream_error",
|
||||
).to_sse()
|
||||
yield StreamFinish().to_sse()
|
||||
finally:
|
||||
# Unsubscribe when client disconnects or stream ends
|
||||
# Unsubscribe when client disconnects or stream ends to prevent resource leak
|
||||
if subscriber_queue is not None:
|
||||
try:
|
||||
await stream_registry.unsubscribe_from_task(
|
||||
@@ -833,6 +751,8 @@ async def stream_task(
|
||||
)
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
import asyncio
|
||||
|
||||
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
|
||||
try:
|
||||
while True:
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
"""Claude Agent SDK integration for CoPilot.
|
||||
|
||||
This module provides the integration layer between the Claude Agent SDK
|
||||
and the existing CoPilot tool system, enabling drop-in replacement of
|
||||
the current LLM orchestration with the battle-tested Claude Agent SDK.
|
||||
"""
|
||||
|
||||
from .service import stream_chat_completion_sdk
|
||||
from .tool_adapter import create_copilot_mcp_server
|
||||
|
||||
__all__ = [
|
||||
"stream_chat_completion_sdk",
|
||||
"create_copilot_mcp_server",
|
||||
]
|
||||
@@ -1,198 +0,0 @@
|
||||
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
|
||||
|
||||
This module provides the adapter layer that converts streaming messages from
|
||||
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
|
||||
the frontend expects.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from claude_agent_sdk import (
|
||||
AssistantMessage,
|
||||
Message,
|
||||
ResultMessage,
|
||||
SystemMessage,
|
||||
TextBlock,
|
||||
ToolResultBlock,
|
||||
ToolUseBlock,
|
||||
UserMessage,
|
||||
)
|
||||
|
||||
from backend.api.features.chat.response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamFinishStep,
|
||||
StreamStart,
|
||||
StreamStartStep,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
)
|
||||
from backend.api.features.chat.sdk.tool_adapter import (
|
||||
MCP_TOOL_PREFIX,
|
||||
pop_pending_tool_output,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SDKResponseAdapter:
|
||||
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
|
||||
|
||||
This class maintains state during a streaming session to properly track
|
||||
text blocks, tool calls, and message lifecycle.
|
||||
"""
|
||||
|
||||
def __init__(self, message_id: str | None = None):
|
||||
self.message_id = message_id or str(uuid.uuid4())
|
||||
self.text_block_id = str(uuid.uuid4())
|
||||
self.has_started_text = False
|
||||
self.has_ended_text = False
|
||||
self.current_tool_calls: dict[str, dict[str, str]] = {}
|
||||
self.task_id: str | None = None
|
||||
self.step_open = False
|
||||
|
||||
def set_task_id(self, task_id: str) -> None:
|
||||
"""Set the task ID for reconnection support."""
|
||||
self.task_id = task_id
|
||||
|
||||
def convert_message(self, sdk_message: Message) -> list[StreamBaseResponse]:
|
||||
"""Convert a single SDK message to Vercel AI SDK format."""
|
||||
responses: list[StreamBaseResponse] = []
|
||||
|
||||
if isinstance(sdk_message, SystemMessage):
|
||||
if sdk_message.subtype == "init":
|
||||
responses.append(
|
||||
StreamStart(messageId=self.message_id, taskId=self.task_id)
|
||||
)
|
||||
# Open the first step (matches non-SDK: StreamStart then StreamStartStep)
|
||||
responses.append(StreamStartStep())
|
||||
self.step_open = True
|
||||
|
||||
elif isinstance(sdk_message, AssistantMessage):
|
||||
# After tool results, the SDK sends a new AssistantMessage for the
|
||||
# next LLM turn. Open a new step if the previous one was closed.
|
||||
if not self.step_open:
|
||||
responses.append(StreamStartStep())
|
||||
self.step_open = True
|
||||
|
||||
for block in sdk_message.content:
|
||||
if isinstance(block, TextBlock):
|
||||
if block.text:
|
||||
self._ensure_text_started(responses)
|
||||
responses.append(
|
||||
StreamTextDelta(id=self.text_block_id, delta=block.text)
|
||||
)
|
||||
|
||||
elif isinstance(block, ToolUseBlock):
|
||||
self._end_text_if_open(responses)
|
||||
|
||||
# Strip MCP prefix so frontend sees "find_block"
|
||||
# instead of "mcp__copilot__find_block".
|
||||
tool_name = block.name.removeprefix(MCP_TOOL_PREFIX)
|
||||
|
||||
responses.append(
|
||||
StreamToolInputStart(toolCallId=block.id, toolName=tool_name)
|
||||
)
|
||||
responses.append(
|
||||
StreamToolInputAvailable(
|
||||
toolCallId=block.id,
|
||||
toolName=tool_name,
|
||||
input=block.input,
|
||||
)
|
||||
)
|
||||
self.current_tool_calls[block.id] = {"name": tool_name}
|
||||
|
||||
elif isinstance(sdk_message, UserMessage):
|
||||
# UserMessage carries tool results back from tool execution.
|
||||
content = sdk_message.content
|
||||
blocks = content if isinstance(content, list) else []
|
||||
for block in blocks:
|
||||
if isinstance(block, ToolResultBlock) and block.tool_use_id:
|
||||
tool_info = self.current_tool_calls.get(block.tool_use_id, {})
|
||||
tool_name = tool_info.get("name", "unknown")
|
||||
|
||||
# Prefer the stashed full output over the SDK's
|
||||
# (potentially truncated) ToolResultBlock content.
|
||||
# The SDK truncates large results, writing them to disk,
|
||||
# which breaks frontend widget parsing.
|
||||
output = pop_pending_tool_output(tool_name) or (
|
||||
_extract_tool_output(block.content)
|
||||
)
|
||||
|
||||
responses.append(
|
||||
StreamToolOutputAvailable(
|
||||
toolCallId=block.tool_use_id,
|
||||
toolName=tool_name,
|
||||
output=output,
|
||||
success=not (block.is_error or False),
|
||||
)
|
||||
)
|
||||
|
||||
# Close the current step after tool results — the next
|
||||
# AssistantMessage will open a new step for the continuation.
|
||||
if self.step_open:
|
||||
responses.append(StreamFinishStep())
|
||||
self.step_open = False
|
||||
|
||||
elif isinstance(sdk_message, ResultMessage):
|
||||
self._end_text_if_open(responses)
|
||||
# Close the step before finishing.
|
||||
if self.step_open:
|
||||
responses.append(StreamFinishStep())
|
||||
self.step_open = False
|
||||
|
||||
if sdk_message.subtype == "success":
|
||||
responses.append(StreamFinish())
|
||||
elif sdk_message.subtype in ("error", "error_during_execution"):
|
||||
error_msg = getattr(sdk_message, "result", None) or "Unknown error"
|
||||
responses.append(
|
||||
StreamError(errorText=str(error_msg), code="sdk_error")
|
||||
)
|
||||
responses.append(StreamFinish())
|
||||
|
||||
else:
|
||||
logger.debug(f"Unhandled SDK message type: {type(sdk_message).__name__}")
|
||||
|
||||
return responses
|
||||
|
||||
def _ensure_text_started(self, responses: list[StreamBaseResponse]) -> None:
|
||||
"""Start (or restart) a text block if needed."""
|
||||
if not self.has_started_text or self.has_ended_text:
|
||||
if self.has_ended_text:
|
||||
self.text_block_id = str(uuid.uuid4())
|
||||
self.has_ended_text = False
|
||||
responses.append(StreamTextStart(id=self.text_block_id))
|
||||
self.has_started_text = True
|
||||
|
||||
def _end_text_if_open(self, responses: list[StreamBaseResponse]) -> None:
|
||||
"""End the current text block if one is open."""
|
||||
if self.has_started_text and not self.has_ended_text:
|
||||
responses.append(StreamTextEnd(id=self.text_block_id))
|
||||
self.has_ended_text = True
|
||||
|
||||
|
||||
def _extract_tool_output(content: str | list[dict[str, str]] | None) -> str:
|
||||
"""Extract a string output from a ToolResultBlock's content field."""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts = [item.get("text", "") for item in content if item.get("type") == "text"]
|
||||
if parts:
|
||||
return "".join(parts)
|
||||
try:
|
||||
return json.dumps(content)
|
||||
except (TypeError, ValueError):
|
||||
return str(content)
|
||||
if content is None:
|
||||
return ""
|
||||
try:
|
||||
return json.dumps(content)
|
||||
except (TypeError, ValueError):
|
||||
return str(content)
|
||||
@@ -1,366 +0,0 @@
|
||||
"""Unit tests for the SDK response adapter."""
|
||||
|
||||
from claude_agent_sdk import (
|
||||
AssistantMessage,
|
||||
ResultMessage,
|
||||
SystemMessage,
|
||||
TextBlock,
|
||||
ToolResultBlock,
|
||||
ToolUseBlock,
|
||||
UserMessage,
|
||||
)
|
||||
|
||||
from backend.api.features.chat.response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamFinishStep,
|
||||
StreamStart,
|
||||
StreamStartStep,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
)
|
||||
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
from .tool_adapter import MCP_TOOL_PREFIX
|
||||
|
||||
|
||||
def _adapter() -> SDKResponseAdapter:
|
||||
a = SDKResponseAdapter(message_id="msg-1")
|
||||
a.set_task_id("task-1")
|
||||
return a
|
||||
|
||||
|
||||
# -- SystemMessage -----------------------------------------------------------
|
||||
|
||||
|
||||
def test_system_init_emits_start_and_step():
|
||||
adapter = _adapter()
|
||||
results = adapter.convert_message(SystemMessage(subtype="init", data={}))
|
||||
assert len(results) == 2
|
||||
assert isinstance(results[0], StreamStart)
|
||||
assert results[0].messageId == "msg-1"
|
||||
assert results[0].taskId == "task-1"
|
||||
assert isinstance(results[1], StreamStartStep)
|
||||
|
||||
|
||||
def test_system_non_init_emits_nothing():
|
||||
adapter = _adapter()
|
||||
results = adapter.convert_message(SystemMessage(subtype="other", data={}))
|
||||
assert results == []
|
||||
|
||||
|
||||
# -- AssistantMessage with TextBlock -----------------------------------------
|
||||
|
||||
|
||||
def test_text_block_emits_step_start_and_delta():
|
||||
adapter = _adapter()
|
||||
msg = AssistantMessage(content=[TextBlock(text="hello")], model="test")
|
||||
results = adapter.convert_message(msg)
|
||||
assert len(results) == 3
|
||||
assert isinstance(results[0], StreamStartStep)
|
||||
assert isinstance(results[1], StreamTextStart)
|
||||
assert isinstance(results[2], StreamTextDelta)
|
||||
assert results[2].delta == "hello"
|
||||
|
||||
|
||||
def test_empty_text_block_emits_only_step():
|
||||
adapter = _adapter()
|
||||
msg = AssistantMessage(content=[TextBlock(text="")], model="test")
|
||||
results = adapter.convert_message(msg)
|
||||
# Empty text skipped, but step still opens
|
||||
assert len(results) == 1
|
||||
assert isinstance(results[0], StreamStartStep)
|
||||
|
||||
|
||||
def test_multiple_text_deltas_reuse_block_id():
|
||||
adapter = _adapter()
|
||||
msg1 = AssistantMessage(content=[TextBlock(text="a")], model="test")
|
||||
msg2 = AssistantMessage(content=[TextBlock(text="b")], model="test")
|
||||
r1 = adapter.convert_message(msg1)
|
||||
r2 = adapter.convert_message(msg2)
|
||||
# First gets step+start+delta, second only delta (block & step already started)
|
||||
assert len(r1) == 3
|
||||
assert isinstance(r1[0], StreamStartStep)
|
||||
assert isinstance(r1[1], StreamTextStart)
|
||||
assert len(r2) == 1
|
||||
assert isinstance(r2[0], StreamTextDelta)
|
||||
assert r1[1].id == r2[0].id # same block ID
|
||||
|
||||
|
||||
# -- AssistantMessage with ToolUseBlock --------------------------------------
|
||||
|
||||
|
||||
def test_tool_use_emits_input_start_and_available():
|
||||
"""Tool names arrive with MCP prefix and should be stripped for the frontend."""
|
||||
adapter = _adapter()
|
||||
msg = AssistantMessage(
|
||||
content=[
|
||||
ToolUseBlock(
|
||||
id="tool-1",
|
||||
name=f"{MCP_TOOL_PREFIX}find_agent",
|
||||
input={"q": "x"},
|
||||
)
|
||||
],
|
||||
model="test",
|
||||
)
|
||||
results = adapter.convert_message(msg)
|
||||
assert len(results) == 3
|
||||
assert isinstance(results[0], StreamStartStep)
|
||||
assert isinstance(results[1], StreamToolInputStart)
|
||||
assert results[1].toolCallId == "tool-1"
|
||||
assert results[1].toolName == "find_agent" # prefix stripped
|
||||
assert isinstance(results[2], StreamToolInputAvailable)
|
||||
assert results[2].toolName == "find_agent" # prefix stripped
|
||||
assert results[2].input == {"q": "x"}
|
||||
|
||||
|
||||
def test_text_then_tool_ends_text_block():
|
||||
adapter = _adapter()
|
||||
text_msg = AssistantMessage(content=[TextBlock(text="thinking...")], model="test")
|
||||
tool_msg = AssistantMessage(
|
||||
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
|
||||
model="test",
|
||||
)
|
||||
adapter.convert_message(text_msg) # opens step + text
|
||||
results = adapter.convert_message(tool_msg)
|
||||
# Step already open, so: TextEnd, ToolInputStart, ToolInputAvailable
|
||||
assert len(results) == 3
|
||||
assert isinstance(results[0], StreamTextEnd)
|
||||
assert isinstance(results[1], StreamToolInputStart)
|
||||
|
||||
|
||||
# -- UserMessage with ToolResultBlock ----------------------------------------
|
||||
|
||||
|
||||
def test_tool_result_emits_output_and_finish_step():
|
||||
adapter = _adapter()
|
||||
# First register the tool call (opens step) — SDK sends prefixed name
|
||||
tool_msg = AssistantMessage(
|
||||
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}find_agent", input={})],
|
||||
model="test",
|
||||
)
|
||||
adapter.convert_message(tool_msg)
|
||||
|
||||
# Now send tool result
|
||||
result_msg = UserMessage(
|
||||
content=[ToolResultBlock(tool_use_id="t1", content="found 3 agents")]
|
||||
)
|
||||
results = adapter.convert_message(result_msg)
|
||||
assert len(results) == 2
|
||||
assert isinstance(results[0], StreamToolOutputAvailable)
|
||||
assert results[0].toolCallId == "t1"
|
||||
assert results[0].toolName == "find_agent" # prefix stripped
|
||||
assert results[0].output == "found 3 agents"
|
||||
assert results[0].success is True
|
||||
assert isinstance(results[1], StreamFinishStep)
|
||||
|
||||
|
||||
def test_tool_result_error():
|
||||
adapter = _adapter()
|
||||
adapter.convert_message(
|
||||
AssistantMessage(
|
||||
content=[
|
||||
ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}run_agent", input={})
|
||||
],
|
||||
model="test",
|
||||
)
|
||||
)
|
||||
result_msg = UserMessage(
|
||||
content=[ToolResultBlock(tool_use_id="t1", content="timeout", is_error=True)]
|
||||
)
|
||||
results = adapter.convert_message(result_msg)
|
||||
assert isinstance(results[0], StreamToolOutputAvailable)
|
||||
assert results[0].success is False
|
||||
assert isinstance(results[1], StreamFinishStep)
|
||||
|
||||
|
||||
def test_tool_result_list_content():
|
||||
adapter = _adapter()
|
||||
adapter.convert_message(
|
||||
AssistantMessage(
|
||||
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
|
||||
model="test",
|
||||
)
|
||||
)
|
||||
result_msg = UserMessage(
|
||||
content=[
|
||||
ToolResultBlock(
|
||||
tool_use_id="t1",
|
||||
content=[
|
||||
{"type": "text", "text": "line1"},
|
||||
{"type": "text", "text": "line2"},
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
results = adapter.convert_message(result_msg)
|
||||
assert isinstance(results[0], StreamToolOutputAvailable)
|
||||
assert results[0].output == "line1line2"
|
||||
assert isinstance(results[1], StreamFinishStep)
|
||||
|
||||
|
||||
def test_string_user_message_ignored():
|
||||
"""A plain string UserMessage (not tool results) produces no output."""
|
||||
adapter = _adapter()
|
||||
results = adapter.convert_message(UserMessage(content="hello"))
|
||||
assert results == []
|
||||
|
||||
|
||||
# -- ResultMessage -----------------------------------------------------------
|
||||
|
||||
|
||||
def test_result_success_emits_finish_step_and_finish():
|
||||
adapter = _adapter()
|
||||
# Start some text first (opens step)
|
||||
adapter.convert_message(
|
||||
AssistantMessage(content=[TextBlock(text="done")], model="test")
|
||||
)
|
||||
msg = ResultMessage(
|
||||
subtype="success",
|
||||
duration_ms=100,
|
||||
duration_api_ms=50,
|
||||
is_error=False,
|
||||
num_turns=1,
|
||||
session_id="s1",
|
||||
)
|
||||
results = adapter.convert_message(msg)
|
||||
# TextEnd + FinishStep + StreamFinish
|
||||
assert len(results) == 3
|
||||
assert isinstance(results[0], StreamTextEnd)
|
||||
assert isinstance(results[1], StreamFinishStep)
|
||||
assert isinstance(results[2], StreamFinish)
|
||||
|
||||
|
||||
def test_result_error_emits_error_and_finish():
|
||||
adapter = _adapter()
|
||||
msg = ResultMessage(
|
||||
subtype="error",
|
||||
duration_ms=100,
|
||||
duration_api_ms=50,
|
||||
is_error=True,
|
||||
num_turns=0,
|
||||
session_id="s1",
|
||||
result="API rate limited",
|
||||
)
|
||||
results = adapter.convert_message(msg)
|
||||
# No step was open, so no FinishStep — just Error + Finish
|
||||
assert len(results) == 2
|
||||
assert isinstance(results[0], StreamError)
|
||||
assert "API rate limited" in results[0].errorText
|
||||
assert isinstance(results[1], StreamFinish)
|
||||
|
||||
|
||||
# -- Text after tools (new block ID) ----------------------------------------
|
||||
|
||||
|
||||
def test_text_after_tool_gets_new_block_id():
|
||||
adapter = _adapter()
|
||||
# Text -> Tool -> ToolResult -> Text should get a new text block ID and step
|
||||
adapter.convert_message(
|
||||
AssistantMessage(content=[TextBlock(text="before")], model="test")
|
||||
)
|
||||
adapter.convert_message(
|
||||
AssistantMessage(
|
||||
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
|
||||
model="test",
|
||||
)
|
||||
)
|
||||
# Send tool result (closes step)
|
||||
adapter.convert_message(
|
||||
UserMessage(content=[ToolResultBlock(tool_use_id="t1", content="ok")])
|
||||
)
|
||||
results = adapter.convert_message(
|
||||
AssistantMessage(content=[TextBlock(text="after")], model="test")
|
||||
)
|
||||
# Should get StreamStartStep (new step) + StreamTextStart (new block) + StreamTextDelta
|
||||
assert len(results) == 3
|
||||
assert isinstance(results[0], StreamStartStep)
|
||||
assert isinstance(results[1], StreamTextStart)
|
||||
assert isinstance(results[2], StreamTextDelta)
|
||||
assert results[2].delta == "after"
|
||||
|
||||
|
||||
# -- Full conversation flow --------------------------------------------------
|
||||
|
||||
|
||||
def test_full_conversation_flow():
|
||||
"""Simulate a complete conversation: init -> text -> tool -> result -> text -> finish."""
|
||||
adapter = _adapter()
|
||||
all_responses: list[StreamBaseResponse] = []
|
||||
|
||||
# 1. Init
|
||||
all_responses.extend(
|
||||
adapter.convert_message(SystemMessage(subtype="init", data={}))
|
||||
)
|
||||
# 2. Assistant text
|
||||
all_responses.extend(
|
||||
adapter.convert_message(
|
||||
AssistantMessage(content=[TextBlock(text="Let me search")], model="test")
|
||||
)
|
||||
)
|
||||
# 3. Tool use
|
||||
all_responses.extend(
|
||||
adapter.convert_message(
|
||||
AssistantMessage(
|
||||
content=[
|
||||
ToolUseBlock(
|
||||
id="t1",
|
||||
name=f"{MCP_TOOL_PREFIX}find_agent",
|
||||
input={"query": "email"},
|
||||
)
|
||||
],
|
||||
model="test",
|
||||
)
|
||||
)
|
||||
)
|
||||
# 4. Tool result
|
||||
all_responses.extend(
|
||||
adapter.convert_message(
|
||||
UserMessage(
|
||||
content=[ToolResultBlock(tool_use_id="t1", content="Found 2 agents")]
|
||||
)
|
||||
)
|
||||
)
|
||||
# 5. More text
|
||||
all_responses.extend(
|
||||
adapter.convert_message(
|
||||
AssistantMessage(content=[TextBlock(text="I found 2")], model="test")
|
||||
)
|
||||
)
|
||||
# 6. Result
|
||||
all_responses.extend(
|
||||
adapter.convert_message(
|
||||
ResultMessage(
|
||||
subtype="success",
|
||||
duration_ms=500,
|
||||
duration_api_ms=400,
|
||||
is_error=False,
|
||||
num_turns=2,
|
||||
session_id="s1",
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
types = [type(r).__name__ for r in all_responses]
|
||||
assert types == [
|
||||
"StreamStart",
|
||||
"StreamStartStep", # step 1: text + tool call
|
||||
"StreamTextStart",
|
||||
"StreamTextDelta", # "Let me search"
|
||||
"StreamTextEnd", # closed before tool
|
||||
"StreamToolInputStart",
|
||||
"StreamToolInputAvailable",
|
||||
"StreamToolOutputAvailable", # tool result
|
||||
"StreamFinishStep", # step 1 closed after tool result
|
||||
"StreamStartStep", # step 2: continuation text
|
||||
"StreamTextStart", # new block after tool
|
||||
"StreamTextDelta", # "I found 2"
|
||||
"StreamTextEnd", # closed by result
|
||||
"StreamFinishStep", # step 2 closed
|
||||
"StreamFinish",
|
||||
]
|
||||
@@ -1,296 +0,0 @@
|
||||
"""Security hooks for Claude Agent SDK integration.
|
||||
|
||||
This module provides security hooks that validate tool calls before execution,
|
||||
ensuring multi-user isolation and preventing unauthorized operations.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Any, cast
|
||||
|
||||
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Tools that are blocked entirely (CLI/system access).
|
||||
# "Bash" (capital) is the SDK built-in — it's NOT in allowed_tools but blocked
|
||||
# here as defence-in-depth. The agent uses mcp__copilot__bash_exec instead,
|
||||
# which has kernel-level network isolation (unshare --net).
|
||||
BLOCKED_TOOLS = {
|
||||
"Bash",
|
||||
"bash",
|
||||
"shell",
|
||||
"exec",
|
||||
"terminal",
|
||||
"command",
|
||||
}
|
||||
|
||||
# Tools allowed only when their path argument stays within the SDK workspace.
|
||||
# The SDK uses these to handle oversized tool results (writes to tool-results/
|
||||
# files, then reads them back) and for workspace file operations.
|
||||
WORKSPACE_SCOPED_TOOLS = {"Read", "Write", "Edit", "Glob", "Grep"}
|
||||
|
||||
# Dangerous patterns in tool inputs
|
||||
DANGEROUS_PATTERNS = [
|
||||
r"sudo",
|
||||
r"rm\s+-rf",
|
||||
r"dd\s+if=",
|
||||
r"/etc/passwd",
|
||||
r"/etc/shadow",
|
||||
r"chmod\s+777",
|
||||
r"curl\s+.*\|.*sh",
|
||||
r"wget\s+.*\|.*sh",
|
||||
r"eval\s*\(",
|
||||
r"exec\s*\(",
|
||||
r"__import__",
|
||||
r"os\.system",
|
||||
r"subprocess",
|
||||
]
|
||||
|
||||
|
||||
def _deny(reason: str) -> dict[str, Any]:
|
||||
"""Return a hook denial response."""
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": reason,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _validate_workspace_path(
|
||||
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None
|
||||
) -> dict[str, Any]:
|
||||
"""Validate that a workspace-scoped tool only accesses allowed paths.
|
||||
|
||||
Allowed directories:
|
||||
- The SDK working directory (``/tmp/copilot-<session>/``)
|
||||
- The SDK tool-results directory (``~/.claude/projects/…/tool-results/``)
|
||||
"""
|
||||
path = tool_input.get("file_path") or tool_input.get("path") or ""
|
||||
if not path:
|
||||
# Glob/Grep without a path default to cwd which is already sandboxed
|
||||
return {}
|
||||
|
||||
# Resolve relative paths against sdk_cwd (the SDK sets cwd so the LLM
|
||||
# naturally uses relative paths like "test.txt" instead of absolute ones).
|
||||
if not os.path.isabs(path) and sdk_cwd:
|
||||
resolved = os.path.normpath(os.path.join(sdk_cwd, path))
|
||||
else:
|
||||
resolved = os.path.normpath(os.path.expanduser(path))
|
||||
|
||||
# Allow access within the SDK working directory
|
||||
if sdk_cwd:
|
||||
norm_cwd = os.path.normpath(sdk_cwd)
|
||||
if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd:
|
||||
return {}
|
||||
|
||||
# Allow access to ~/.claude/projects/*/tool-results/ (big tool results)
|
||||
claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects"))
|
||||
if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved:
|
||||
return {}
|
||||
|
||||
logger.warning(
|
||||
f"Blocked {tool_name} outside workspace: {path} (resolved={resolved})"
|
||||
)
|
||||
workspace_hint = f" Allowed workspace: {sdk_cwd}" if sdk_cwd else ""
|
||||
return _deny(
|
||||
f"[SECURITY] Tool '{tool_name}' can only access files within the workspace "
|
||||
f"directory.{workspace_hint} "
|
||||
"This is enforced by the platform and cannot be bypassed."
|
||||
)
|
||||
|
||||
|
||||
def _validate_tool_access(
|
||||
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""Validate that a tool call is allowed.
|
||||
|
||||
Returns:
|
||||
Empty dict to allow, or dict with hookSpecificOutput to deny
|
||||
"""
|
||||
# Block forbidden tools
|
||||
if tool_name in BLOCKED_TOOLS:
|
||||
logger.warning(f"Blocked tool access attempt: {tool_name}")
|
||||
return _deny(
|
||||
f"[SECURITY] Tool '{tool_name}' is blocked for security. "
|
||||
"This is enforced by the platform and cannot be bypassed. "
|
||||
"Use the CoPilot-specific MCP tools instead."
|
||||
)
|
||||
|
||||
# Workspace-scoped tools: allowed only within the SDK workspace directory
|
||||
if tool_name in WORKSPACE_SCOPED_TOOLS:
|
||||
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)
|
||||
|
||||
# Check for dangerous patterns in tool input
|
||||
# Use json.dumps for predictable format (str() produces Python repr)
|
||||
input_str = json.dumps(tool_input) if tool_input else ""
|
||||
|
||||
for pattern in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, input_str, re.IGNORECASE):
|
||||
logger.warning(
|
||||
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
|
||||
)
|
||||
return _deny(
|
||||
"[SECURITY] Input contains a blocked pattern. "
|
||||
"This is enforced by the platform and cannot be bypassed."
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def _validate_user_isolation(
|
||||
tool_name: str, tool_input: dict[str, Any], user_id: str | None
|
||||
) -> dict[str, Any]:
|
||||
"""Validate that tool calls respect user isolation."""
|
||||
# For workspace file tools, ensure path doesn't escape
|
||||
if "workspace" in tool_name.lower():
|
||||
path = tool_input.get("path", "") or tool_input.get("file_path", "")
|
||||
if path:
|
||||
# Check for path traversal
|
||||
if ".." in path or path.startswith("/"):
|
||||
logger.warning(
|
||||
f"Blocked path traversal attempt: {path} by user {user_id}"
|
||||
)
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": "Path traversal not allowed",
|
||||
}
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def create_security_hooks(
|
||||
user_id: str | None,
|
||||
sdk_cwd: str | None = None,
|
||||
max_subtasks: int = 3,
|
||||
) -> dict[str, Any]:
|
||||
"""Create the security hooks configuration for Claude Agent SDK.
|
||||
|
||||
Includes security validation and observability hooks:
|
||||
- PreToolUse: Security validation before tool execution
|
||||
- PostToolUse: Log successful tool executions
|
||||
- PostToolUseFailure: Log and handle failed tool executions
|
||||
- PreCompact: Log context compaction events (SDK handles compaction automatically)
|
||||
|
||||
Args:
|
||||
user_id: Current user ID for isolation validation
|
||||
sdk_cwd: SDK working directory for workspace-scoped tool validation
|
||||
max_subtasks: Maximum Task (sub-agent) spawns allowed per session
|
||||
|
||||
Returns:
|
||||
Hooks configuration dict for ClaudeAgentOptions
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import HookMatcher
|
||||
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
|
||||
|
||||
# Per-session counter for Task sub-agent spawns
|
||||
task_spawn_count = 0
|
||||
|
||||
async def pre_tool_use_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Combined pre-tool-use validation hook."""
|
||||
nonlocal task_spawn_count
|
||||
_ = context # unused but required by signature
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
|
||||
|
||||
# Rate-limit Task (sub-agent) spawns per session
|
||||
if tool_name == "Task":
|
||||
task_spawn_count += 1
|
||||
if task_spawn_count > max_subtasks:
|
||||
logger.warning(
|
||||
f"[SDK] Task limit reached ({max_subtasks}), user={user_id}"
|
||||
)
|
||||
return cast(
|
||||
SyncHookJSONOutput,
|
||||
_deny(
|
||||
f"Maximum {max_subtasks} sub-tasks per session. "
|
||||
"Please continue in the main conversation."
|
||||
),
|
||||
)
|
||||
|
||||
# Strip MCP prefix for consistent validation
|
||||
is_copilot_tool = tool_name.startswith(MCP_TOOL_PREFIX)
|
||||
clean_name = tool_name.removeprefix(MCP_TOOL_PREFIX)
|
||||
|
||||
# Only block non-CoPilot tools; our MCP-registered tools
|
||||
# (including Read for oversized results) are already sandboxed.
|
||||
if not is_copilot_tool:
|
||||
result = _validate_tool_access(clean_name, tool_input, sdk_cwd)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
# Validate user isolation
|
||||
result = _validate_user_isolation(clean_name, tool_input, user_id)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def post_tool_use_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log successful tool executions for observability."""
|
||||
_ = context
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def post_tool_failure_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log failed tool executions for debugging."""
|
||||
_ = context
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
error = input_data.get("error", "Unknown error")
|
||||
logger.warning(
|
||||
f"[SDK] Tool failed: {tool_name}, error={error}, "
|
||||
f"user={user_id}, tool_use_id={tool_use_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def pre_compact_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log when SDK triggers context compaction.
|
||||
|
||||
The SDK automatically compacts conversation history when it grows too large.
|
||||
This hook provides visibility into when compaction happens.
|
||||
"""
|
||||
_ = context, tool_use_id
|
||||
trigger = input_data.get("trigger", "auto")
|
||||
logger.info(
|
||||
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
return {
|
||||
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
|
||||
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
|
||||
"PostToolUseFailure": [
|
||||
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
|
||||
],
|
||||
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
|
||||
}
|
||||
except ImportError:
|
||||
# Fallback for when SDK isn't available - return empty hooks
|
||||
logger.warning("claude-agent-sdk not available, security hooks disabled")
|
||||
return {}
|
||||
@@ -1,165 +0,0 @@
|
||||
"""Unit tests for SDK security hooks."""
|
||||
|
||||
import os
|
||||
|
||||
from .security_hooks import _validate_tool_access, _validate_user_isolation
|
||||
|
||||
SDK_CWD = "/tmp/copilot-abc123"
|
||||
|
||||
|
||||
def _is_denied(result: dict) -> bool:
|
||||
hook = result.get("hookSpecificOutput", {})
|
||||
return hook.get("permissionDecision") == "deny"
|
||||
|
||||
|
||||
# -- Blocked tools -----------------------------------------------------------
|
||||
|
||||
|
||||
def test_blocked_tools_denied():
|
||||
for tool in ("bash", "shell", "exec", "terminal", "command"):
|
||||
result = _validate_tool_access(tool, {})
|
||||
assert _is_denied(result), f"{tool} should be blocked"
|
||||
|
||||
|
||||
def test_unknown_tool_allowed():
|
||||
result = _validate_tool_access("SomeCustomTool", {})
|
||||
assert result == {}
|
||||
|
||||
|
||||
# -- Workspace-scoped tools --------------------------------------------------
|
||||
|
||||
|
||||
def test_read_within_workspace_allowed():
|
||||
result = _validate_tool_access(
|
||||
"Read", {"file_path": f"{SDK_CWD}/file.txt"}, sdk_cwd=SDK_CWD
|
||||
)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_write_within_workspace_allowed():
|
||||
result = _validate_tool_access(
|
||||
"Write", {"file_path": f"{SDK_CWD}/output.json"}, sdk_cwd=SDK_CWD
|
||||
)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_edit_within_workspace_allowed():
|
||||
result = _validate_tool_access(
|
||||
"Edit", {"file_path": f"{SDK_CWD}/src/main.py"}, sdk_cwd=SDK_CWD
|
||||
)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_glob_within_workspace_allowed():
|
||||
result = _validate_tool_access("Glob", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_grep_within_workspace_allowed():
|
||||
result = _validate_tool_access("Grep", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_read_outside_workspace_denied():
|
||||
result = _validate_tool_access(
|
||||
"Read", {"file_path": "/etc/passwd"}, sdk_cwd=SDK_CWD
|
||||
)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
def test_write_outside_workspace_denied():
|
||||
result = _validate_tool_access(
|
||||
"Write", {"file_path": "/home/user/secrets.txt"}, sdk_cwd=SDK_CWD
|
||||
)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
def test_traversal_attack_denied():
|
||||
result = _validate_tool_access(
|
||||
"Read",
|
||||
{"file_path": f"{SDK_CWD}/../../etc/passwd"},
|
||||
sdk_cwd=SDK_CWD,
|
||||
)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
def test_no_path_allowed():
|
||||
"""Glob/Grep without a path argument defaults to cwd — should pass."""
|
||||
result = _validate_tool_access("Glob", {}, sdk_cwd=SDK_CWD)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_read_no_cwd_denies_absolute():
|
||||
"""If no sdk_cwd is set, absolute paths are denied."""
|
||||
result = _validate_tool_access("Read", {"file_path": "/tmp/anything"})
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
# -- Tool-results directory --------------------------------------------------
|
||||
|
||||
|
||||
def test_read_tool_results_allowed():
|
||||
home = os.path.expanduser("~")
|
||||
path = f"{home}/.claude/projects/-tmp-copilot-abc123/tool-results/12345.txt"
|
||||
result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_read_claude_projects_without_tool_results_denied():
|
||||
home = os.path.expanduser("~")
|
||||
path = f"{home}/.claude/projects/-tmp-copilot-abc123/settings.json"
|
||||
result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
# -- Built-in Bash is blocked (use bash_exec MCP tool instead) ---------------
|
||||
|
||||
|
||||
def test_bash_builtin_always_blocked():
|
||||
"""SDK built-in Bash is blocked — bash_exec MCP tool with bubblewrap is used instead."""
|
||||
result = _validate_tool_access("Bash", {"command": "echo hello"}, sdk_cwd=SDK_CWD)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
# -- Dangerous patterns ------------------------------------------------------
|
||||
|
||||
|
||||
def test_dangerous_pattern_blocked():
|
||||
result = _validate_tool_access("SomeTool", {"cmd": "sudo rm -rf /"})
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
def test_subprocess_pattern_blocked():
|
||||
result = _validate_tool_access("SomeTool", {"code": "subprocess.run(...)"})
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
# -- User isolation ----------------------------------------------------------
|
||||
|
||||
|
||||
def test_workspace_path_traversal_blocked():
|
||||
result = _validate_user_isolation(
|
||||
"workspace_read", {"path": "../../../etc/shadow"}, user_id="user-1"
|
||||
)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
def test_workspace_absolute_path_blocked():
|
||||
result = _validate_user_isolation(
|
||||
"workspace_read", {"path": "/etc/passwd"}, user_id="user-1"
|
||||
)
|
||||
assert _is_denied(result)
|
||||
|
||||
|
||||
def test_workspace_normal_path_allowed():
|
||||
result = _validate_user_isolation(
|
||||
"workspace_read", {"path": "src/main.py"}, user_id="user-1"
|
||||
)
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_non_workspace_tool_passes_isolation():
|
||||
result = _validate_user_isolation(
|
||||
"find_agent", {"query": "email"}, user_id="user-1"
|
||||
)
|
||||
assert result == {}
|
||||
@@ -1,668 +0,0 @@
|
||||
"""Claude Agent SDK service layer for CoPilot chat completions."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
from backend.util.exceptions import NotFoundError
|
||||
|
||||
from .. import stream_registry
|
||||
from ..config import ChatConfig
|
||||
from ..model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
get_chat_session,
|
||||
update_session_title,
|
||||
upsert_chat_session,
|
||||
)
|
||||
from ..response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamStart,
|
||||
StreamTextDelta,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolOutputAvailable,
|
||||
)
|
||||
from ..service import (
|
||||
_build_system_prompt,
|
||||
_execute_long_running_tool_with_streaming,
|
||||
_generate_session_title,
|
||||
)
|
||||
from ..tools.models import OperationPendingResponse, OperationStartedResponse
|
||||
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
|
||||
from ..tracking import track_user_message
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
from .security_hooks import create_security_hooks
|
||||
from .tool_adapter import (
|
||||
COPILOT_TOOL_NAMES,
|
||||
LongRunningCallback,
|
||||
create_copilot_mcp_server,
|
||||
set_execution_context,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = ChatConfig()
|
||||
|
||||
# Set to hold background tasks to prevent garbage collection
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
|
||||
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
|
||||
|
||||
# Appended to the system prompt to inform the agent about available tools.
|
||||
# The SDK built-in Bash is NOT available — use mcp__copilot__bash_exec instead,
|
||||
# which has kernel-level network isolation (unshare --net).
|
||||
_SDK_TOOL_SUPPLEMENT = """
|
||||
|
||||
## Tool notes
|
||||
|
||||
- The SDK built-in Bash tool is NOT available. Use the `bash_exec` MCP tool
|
||||
for shell commands — it runs in a network-isolated sandbox.
|
||||
- **Shared workspace**: The SDK Read/Write tools and `bash_exec` share the
|
||||
same working directory. Files created by one are readable by the other.
|
||||
These files are **ephemeral** — they exist only for the current session.
|
||||
- **Persistent storage**: Use `write_workspace_file` / `read_workspace_file`
|
||||
for files that should persist across sessions (stored in cloud storage).
|
||||
- Long-running tools (create_agent, edit_agent, etc.) are handled
|
||||
asynchronously. You will receive an immediate response; the actual result
|
||||
is delivered to the user via a background stream.
|
||||
"""
|
||||
|
||||
|
||||
def _build_long_running_callback(user_id: str | None) -> LongRunningCallback:
|
||||
"""Build a callback that delegates long-running tools to the non-SDK infrastructure.
|
||||
|
||||
Long-running tools (create_agent, edit_agent, etc.) are delegated to the
|
||||
existing background infrastructure: stream_registry (Redis Streams),
|
||||
database persistence, and SSE reconnection. This means results survive
|
||||
page refreshes / pod restarts, and the frontend shows the proper loading
|
||||
widget with progress updates.
|
||||
|
||||
The returned callback matches the ``LongRunningCallback`` signature:
|
||||
``(tool_name, args, session) -> MCP response dict``.
|
||||
"""
|
||||
|
||||
async def _callback(
|
||||
tool_name: str, args: dict[str, Any], session: ChatSession
|
||||
) -> dict[str, Any]:
|
||||
operation_id = str(uuid.uuid4())
|
||||
task_id = str(uuid.uuid4())
|
||||
tool_call_id = f"sdk-{uuid.uuid4().hex[:12]}"
|
||||
session_id = session.session_id
|
||||
|
||||
# --- Build user-friendly messages (matches non-SDK service) ---
|
||||
if tool_name == "create_agent":
|
||||
desc = args.get("description", "")
|
||||
desc_preview = (desc[:100] + "...") if len(desc) > 100 else desc
|
||||
pending_msg = (
|
||||
f"Creating your agent: {desc_preview}"
|
||||
if desc_preview
|
||||
else "Creating agent... This may take a few minutes."
|
||||
)
|
||||
started_msg = (
|
||||
"Agent creation started. You can close this tab - "
|
||||
"check your library in a few minutes."
|
||||
)
|
||||
elif tool_name == "edit_agent":
|
||||
changes = args.get("changes", "")
|
||||
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
|
||||
pending_msg = (
|
||||
f"Editing agent: {changes_preview}"
|
||||
if changes_preview
|
||||
else "Editing agent... This may take a few minutes."
|
||||
)
|
||||
started_msg = (
|
||||
"Agent edit started. You can close this tab - "
|
||||
"check your library in a few minutes."
|
||||
)
|
||||
else:
|
||||
pending_msg = f"Running {tool_name}... This may take a few minutes."
|
||||
started_msg = (
|
||||
f"{tool_name} started. You can close this tab - "
|
||||
"check back in a few minutes."
|
||||
)
|
||||
|
||||
# --- Register task in Redis for SSE reconnection ---
|
||||
await stream_registry.create_task(
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=tool_name,
|
||||
operation_id=operation_id,
|
||||
)
|
||||
|
||||
# --- Save OperationPendingResponse to chat history ---
|
||||
pending_message = ChatMessage(
|
||||
role="tool",
|
||||
content=OperationPendingResponse(
|
||||
message=pending_msg,
|
||||
operation_id=operation_id,
|
||||
tool_name=tool_name,
|
||||
).model_dump_json(),
|
||||
tool_call_id=tool_call_id,
|
||||
)
|
||||
session.messages.append(pending_message)
|
||||
await upsert_chat_session(session)
|
||||
|
||||
# --- Spawn background task (reuses non-SDK infrastructure) ---
|
||||
bg_task = asyncio.create_task(
|
||||
_execute_long_running_tool_with_streaming(
|
||||
tool_name=tool_name,
|
||||
parameters=args,
|
||||
tool_call_id=tool_call_id,
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
)
|
||||
_background_tasks.add(bg_task)
|
||||
bg_task.add_done_callback(_background_tasks.discard)
|
||||
await stream_registry.set_task_asyncio_task(task_id, bg_task)
|
||||
|
||||
logger.info(
|
||||
f"[SDK] Long-running tool {tool_name} delegated to background "
|
||||
f"(operation_id={operation_id}, task_id={task_id})"
|
||||
)
|
||||
|
||||
# --- Return OperationStartedResponse as MCP tool result ---
|
||||
# This flows through SDK → response adapter → frontend, triggering
|
||||
# the loading widget with SSE reconnection support.
|
||||
started_json = OperationStartedResponse(
|
||||
message=started_msg,
|
||||
operation_id=operation_id,
|
||||
tool_name=tool_name,
|
||||
task_id=task_id,
|
||||
).model_dump_json()
|
||||
|
||||
return {
|
||||
"content": [{"type": "text", "text": started_json}],
|
||||
"isError": False,
|
||||
}
|
||||
|
||||
return _callback
|
||||
|
||||
|
||||
def _resolve_sdk_model() -> str | None:
|
||||
"""Resolve the model name for the Claude Agent SDK CLI.
|
||||
|
||||
Uses ``config.claude_agent_model`` if set, otherwise derives from
|
||||
``config.model`` by stripping the OpenRouter provider prefix (e.g.,
|
||||
``"anthropic/claude-opus-4.6"`` → ``"claude-opus-4.6"``).
|
||||
"""
|
||||
if config.claude_agent_model:
|
||||
return config.claude_agent_model
|
||||
model = config.model
|
||||
if "/" in model:
|
||||
return model.split("/", 1)[1]
|
||||
return model
|
||||
|
||||
|
||||
def _build_sdk_env() -> dict[str, str]:
|
||||
"""Build env vars for the SDK CLI process.
|
||||
|
||||
Routes API calls through OpenRouter (or a custom base_url) using
|
||||
the same ``config.api_key`` / ``config.base_url`` as the non-SDK path.
|
||||
This gives per-call token and cost tracking on the OpenRouter dashboard.
|
||||
|
||||
Only overrides ``ANTHROPIC_API_KEY`` when a valid proxy URL and auth
|
||||
token are both present — otherwise returns an empty dict so the SDK
|
||||
falls back to its default credentials.
|
||||
"""
|
||||
env: dict[str, str] = {}
|
||||
if config.api_key and config.base_url:
|
||||
# Strip /v1 suffix — SDK expects the base URL without a version path
|
||||
base = config.base_url.rstrip("/")
|
||||
if base.endswith("/v1"):
|
||||
base = base[:-3]
|
||||
if not base or not base.startswith("http"):
|
||||
# Invalid base_url — don't override SDK defaults
|
||||
return env
|
||||
env["ANTHROPIC_BASE_URL"] = base
|
||||
env["ANTHROPIC_AUTH_TOKEN"] = config.api_key
|
||||
# Must be explicitly empty so the CLI uses AUTH_TOKEN instead
|
||||
env["ANTHROPIC_API_KEY"] = ""
|
||||
return env
|
||||
|
||||
|
||||
def _make_sdk_cwd(session_id: str) -> str:
|
||||
"""Create a safe, session-specific working directory path.
|
||||
|
||||
Delegates to :func:`~backend.api.features.chat.tools.sandbox.make_session_path`
|
||||
(single source of truth for path sanitization) and adds a defence-in-depth
|
||||
assertion.
|
||||
"""
|
||||
cwd = make_session_path(session_id)
|
||||
# Defence-in-depth: normpath + startswith is a CodeQL-recognised sanitizer
|
||||
cwd = os.path.normpath(cwd)
|
||||
if not cwd.startswith(_SDK_CWD_PREFIX):
|
||||
raise ValueError(f"SDK cwd escaped prefix: {cwd}")
|
||||
return cwd
|
||||
|
||||
|
||||
def _cleanup_sdk_tool_results(cwd: str) -> None:
|
||||
"""Remove SDK tool-result files for a specific session working directory.
|
||||
|
||||
The SDK creates tool-result files under ~/.claude/projects/<encoded-cwd>/tool-results/.
|
||||
We clean only the specific cwd's results to avoid race conditions between
|
||||
concurrent sessions.
|
||||
|
||||
Security: cwd MUST be created by _make_sdk_cwd() which sanitizes session_id.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
# Security check 1: Validate cwd is under the expected prefix
|
||||
normalized = os.path.normpath(cwd)
|
||||
if not normalized.startswith(_SDK_CWD_PREFIX):
|
||||
logger.warning(f"[SDK] Rejecting cleanup for invalid path: {cwd}")
|
||||
return
|
||||
|
||||
# Security check 2: Ensure no path traversal in the normalized path
|
||||
if ".." in normalized:
|
||||
logger.warning(f"[SDK] Rejecting cleanup for traversal attempt: {cwd}")
|
||||
return
|
||||
|
||||
# SDK encodes the cwd path by replacing '/' with '-'
|
||||
encoded_cwd = normalized.replace("/", "-")
|
||||
|
||||
# Construct the project directory path (known-safe home expansion)
|
||||
claude_projects = os.path.expanduser("~/.claude/projects")
|
||||
project_dir = os.path.join(claude_projects, encoded_cwd)
|
||||
|
||||
# Security check 3: Validate project_dir is under ~/.claude/projects
|
||||
project_dir = os.path.normpath(project_dir)
|
||||
if not project_dir.startswith(claude_projects):
|
||||
logger.warning(
|
||||
f"[SDK] Rejecting cleanup for escaped project path: {project_dir}"
|
||||
)
|
||||
return
|
||||
|
||||
results_dir = os.path.join(project_dir, "tool-results")
|
||||
if os.path.isdir(results_dir):
|
||||
for filename in os.listdir(results_dir):
|
||||
file_path = os.path.join(results_dir, filename)
|
||||
try:
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Also clean up the temp cwd directory itself
|
||||
try:
|
||||
shutil.rmtree(normalized, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
async def _compress_conversation_history(
|
||||
session: ChatSession,
|
||||
) -> list[ChatMessage]:
|
||||
"""Compress prior conversation messages if they exceed the token threshold.
|
||||
|
||||
Uses the shared compress_context() from prompt.py which supports:
|
||||
- LLM summarization of old messages (keeps recent ones intact)
|
||||
- Progressive content truncation as fallback
|
||||
- Middle-out deletion as last resort
|
||||
|
||||
Returns the compressed prior messages (everything except the current message).
|
||||
"""
|
||||
prior = session.messages[:-1]
|
||||
if len(prior) < 2:
|
||||
return prior
|
||||
|
||||
from backend.util.prompt import compress_context
|
||||
|
||||
# Convert ChatMessages to dicts for compress_context
|
||||
messages_dict = []
|
||||
for msg in prior:
|
||||
msg_dict: dict[str, Any] = {"role": msg.role}
|
||||
if msg.content:
|
||||
msg_dict["content"] = msg.content
|
||||
if msg.tool_calls:
|
||||
msg_dict["tool_calls"] = msg.tool_calls
|
||||
if msg.tool_call_id:
|
||||
msg_dict["tool_call_id"] = msg.tool_call_id
|
||||
messages_dict.append(msg_dict)
|
||||
|
||||
try:
|
||||
import openai
|
||||
|
||||
async with openai.AsyncOpenAI(
|
||||
api_key=config.api_key, base_url=config.base_url, timeout=30.0
|
||||
) as client:
|
||||
result = await compress_context(
|
||||
messages=messages_dict,
|
||||
model=config.model,
|
||||
client=client,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"[SDK] Context compression with LLM failed: {e}")
|
||||
# Fall back to truncation-only (no LLM summarization)
|
||||
result = await compress_context(
|
||||
messages=messages_dict,
|
||||
model=config.model,
|
||||
client=None,
|
||||
)
|
||||
|
||||
if result.was_compacted:
|
||||
logger.info(
|
||||
f"[SDK] Context compacted: {result.original_token_count} -> "
|
||||
f"{result.token_count} tokens "
|
||||
f"({result.messages_summarized} summarized, "
|
||||
f"{result.messages_dropped} dropped)"
|
||||
)
|
||||
# Convert compressed dicts back to ChatMessages
|
||||
return [
|
||||
ChatMessage(
|
||||
role=m["role"],
|
||||
content=m.get("content"),
|
||||
tool_calls=m.get("tool_calls"),
|
||||
tool_call_id=m.get("tool_call_id"),
|
||||
)
|
||||
for m in result.messages
|
||||
]
|
||||
|
||||
return prior
|
||||
|
||||
|
||||
def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
|
||||
"""Format conversation messages into a context prefix for the user message.
|
||||
|
||||
Returns a string like:
|
||||
<conversation_history>
|
||||
User: hello
|
||||
You responded: Hi! How can I help?
|
||||
</conversation_history>
|
||||
|
||||
Returns None if there are no messages to format.
|
||||
"""
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
lines: list[str] = []
|
||||
for msg in messages:
|
||||
if not msg.content:
|
||||
continue
|
||||
if msg.role == "user":
|
||||
lines.append(f"User: {msg.content}")
|
||||
elif msg.role == "assistant":
|
||||
lines.append(f"You responded: {msg.content}")
|
||||
# Skip tool messages — they're internal details
|
||||
|
||||
if not lines:
|
||||
return None
|
||||
|
||||
return "<conversation_history>\n" + "\n".join(lines) + "\n</conversation_history>"
|
||||
|
||||
|
||||
async def stream_chat_completion_sdk(
|
||||
session_id: str,
|
||||
message: str | None = None,
|
||||
tool_call_response: str | None = None, # noqa: ARG001
|
||||
is_user_message: bool = True,
|
||||
user_id: str | None = None,
|
||||
retry_count: int = 0, # noqa: ARG001
|
||||
session: ChatSession | None = None,
|
||||
context: dict[str, str] | None = None, # noqa: ARG001
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Stream chat completion using Claude Agent SDK.
|
||||
|
||||
Drop-in replacement for stream_chat_completion with improved reliability.
|
||||
"""
|
||||
|
||||
if session is None:
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
|
||||
if not session:
|
||||
raise NotFoundError(
|
||||
f"Session {session_id} not found. Please create a new session first."
|
||||
)
|
||||
|
||||
if message:
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="user" if is_user_message else "assistant", content=message
|
||||
)
|
||||
)
|
||||
if is_user_message:
|
||||
track_user_message(
|
||||
user_id=user_id, session_id=session_id, message_length=len(message)
|
||||
)
|
||||
|
||||
session = await upsert_chat_session(session)
|
||||
|
||||
# Generate title for new sessions (first user message)
|
||||
if is_user_message and not session.title:
|
||||
user_messages = [m for m in session.messages if m.role == "user"]
|
||||
if len(user_messages) == 1:
|
||||
first_message = user_messages[0].content or message or ""
|
||||
if first_message:
|
||||
task = asyncio.create_task(
|
||||
_update_title_async(session_id, first_message, user_id)
|
||||
)
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
# Build system prompt (reuses non-SDK path with Langfuse support)
|
||||
has_history = len(session.messages) > 1
|
||||
system_prompt, _ = await _build_system_prompt(
|
||||
user_id, has_conversation_history=has_history
|
||||
)
|
||||
system_prompt += _SDK_TOOL_SUPPLEMENT
|
||||
message_id = str(uuid.uuid4())
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
yield StreamStart(messageId=message_id, taskId=task_id)
|
||||
|
||||
stream_completed = False
|
||||
# Initialise sdk_cwd before the try so the finally can reference it
|
||||
# even if _make_sdk_cwd raises (in that case it stays as "").
|
||||
sdk_cwd = ""
|
||||
|
||||
try:
|
||||
# Use a session-specific temp dir to avoid cleanup race conditions
|
||||
# between concurrent sessions.
|
||||
sdk_cwd = _make_sdk_cwd(session_id)
|
||||
os.makedirs(sdk_cwd, exist_ok=True)
|
||||
|
||||
set_execution_context(
|
||||
user_id,
|
||||
session,
|
||||
long_running_callback=_build_long_running_callback(user_id),
|
||||
)
|
||||
try:
|
||||
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
|
||||
|
||||
# Fail fast when no API credentials are available at all
|
||||
sdk_env = _build_sdk_env()
|
||||
if not sdk_env and not os.environ.get("ANTHROPIC_API_KEY"):
|
||||
raise RuntimeError(
|
||||
"No API key configured. Set OPEN_ROUTER_API_KEY "
|
||||
"(or CHAT_API_KEY) for OpenRouter routing, "
|
||||
"or ANTHROPIC_API_KEY for direct Anthropic access."
|
||||
)
|
||||
|
||||
mcp_server = create_copilot_mcp_server()
|
||||
|
||||
sdk_model = _resolve_sdk_model()
|
||||
|
||||
security_hooks = create_security_hooks(
|
||||
user_id,
|
||||
sdk_cwd=sdk_cwd,
|
||||
max_subtasks=config.claude_agent_max_subtasks,
|
||||
)
|
||||
|
||||
options = ClaudeAgentOptions(
|
||||
system_prompt=system_prompt,
|
||||
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
|
||||
allowed_tools=COPILOT_TOOL_NAMES,
|
||||
hooks=security_hooks, # type: ignore[arg-type]
|
||||
cwd=sdk_cwd,
|
||||
max_buffer_size=config.claude_agent_max_buffer_size,
|
||||
# Only pass model/env when OpenRouter is configured
|
||||
**({"model": sdk_model, "env": sdk_env} if sdk_env else {}),
|
||||
)
|
||||
|
||||
adapter = SDKResponseAdapter(message_id=message_id)
|
||||
adapter.set_task_id(task_id)
|
||||
|
||||
async with ClaudeSDKClient(options=options) as client:
|
||||
current_message = message or ""
|
||||
if not current_message and session.messages:
|
||||
last_user = [m for m in session.messages if m.role == "user"]
|
||||
if last_user:
|
||||
current_message = last_user[-1].content or ""
|
||||
|
||||
if not current_message.strip():
|
||||
yield StreamError(
|
||||
errorText="Message cannot be empty.",
|
||||
code="empty_prompt",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
# Build query with conversation history context.
|
||||
# Compress history first to handle long conversations.
|
||||
query_message = current_message
|
||||
if len(session.messages) > 1:
|
||||
compressed = await _compress_conversation_history(session)
|
||||
history_context = _format_conversation_context(compressed)
|
||||
if history_context:
|
||||
query_message = (
|
||||
f"{history_context}\n\n"
|
||||
f"Now, the user says:\n{current_message}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[SDK] Sending query: {current_message[:80]!r}"
|
||||
f" ({len(session.messages)} msgs in session)"
|
||||
)
|
||||
await client.query(query_message, session_id=session_id)
|
||||
|
||||
assistant_response = ChatMessage(role="assistant", content="")
|
||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||
has_appended_assistant = False
|
||||
has_tool_results = False
|
||||
|
||||
async for sdk_msg in client.receive_messages():
|
||||
logger.debug(
|
||||
f"[SDK] Received: {type(sdk_msg).__name__} "
|
||||
f"{getattr(sdk_msg, 'subtype', '')}"
|
||||
)
|
||||
for response in adapter.convert_message(sdk_msg):
|
||||
if isinstance(response, StreamStart):
|
||||
continue
|
||||
|
||||
yield response
|
||||
|
||||
if isinstance(response, StreamTextDelta):
|
||||
delta = response.delta or ""
|
||||
# After tool results, start a new assistant
|
||||
# message for the post-tool text.
|
||||
if has_tool_results and has_appended_assistant:
|
||||
assistant_response = ChatMessage(
|
||||
role="assistant", content=delta
|
||||
)
|
||||
accumulated_tool_calls = []
|
||||
has_appended_assistant = False
|
||||
has_tool_results = False
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
else:
|
||||
assistant_response.content = (
|
||||
assistant_response.content or ""
|
||||
) + delta
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
elif isinstance(response, StreamToolInputAvailable):
|
||||
accumulated_tool_calls.append(
|
||||
{
|
||||
"id": response.toolCallId,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": response.toolName,
|
||||
"arguments": json.dumps(response.input or {}),
|
||||
},
|
||||
}
|
||||
)
|
||||
assistant_response.tool_calls = accumulated_tool_calls
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
elif isinstance(response, StreamToolOutputAvailable):
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=(
|
||||
response.output
|
||||
if isinstance(response.output, str)
|
||||
else str(response.output)
|
||||
),
|
||||
tool_call_id=response.toolCallId,
|
||||
)
|
||||
)
|
||||
has_tool_results = True
|
||||
|
||||
elif isinstance(response, StreamFinish):
|
||||
stream_completed = True
|
||||
|
||||
if stream_completed:
|
||||
break
|
||||
|
||||
if (
|
||||
assistant_response.content or assistant_response.tool_calls
|
||||
) and not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
|
||||
except ImportError:
|
||||
raise RuntimeError(
|
||||
"claude-agent-sdk is not installed. "
|
||||
"Disable SDK mode (CHAT_USE_CLAUDE_AGENT_SDK=false) "
|
||||
"to use the OpenAI-compatible fallback."
|
||||
)
|
||||
|
||||
await upsert_chat_session(session)
|
||||
logger.debug(
|
||||
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
|
||||
)
|
||||
if not stream_completed:
|
||||
yield StreamFinish()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[SDK] Error: {e}", exc_info=True)
|
||||
try:
|
||||
await upsert_chat_session(session)
|
||||
except Exception as save_err:
|
||||
logger.error(f"[SDK] Failed to save session on error: {save_err}")
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="sdk_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
finally:
|
||||
if sdk_cwd:
|
||||
_cleanup_sdk_tool_results(sdk_cwd)
|
||||
|
||||
|
||||
async def _update_title_async(
|
||||
session_id: str, message: str, user_id: str | None = None
|
||||
) -> None:
|
||||
"""Background task to update session title."""
|
||||
try:
|
||||
title = await _generate_session_title(
|
||||
message, user_id=user_id, session_id=session_id
|
||||
)
|
||||
if title:
|
||||
await update_session_title(session_id, title)
|
||||
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[SDK] Failed to update session title: {e}")
|
||||
@@ -1,320 +0,0 @@
|
||||
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
|
||||
|
||||
This module provides the adapter layer that converts existing BaseTool implementations
|
||||
into in-process MCP tools that can be used with the Claude Agent SDK.
|
||||
|
||||
Long-running tools (``is_long_running=True``) are delegated to the non-SDK
|
||||
background infrastructure (stream_registry, Redis persistence, SSE reconnection)
|
||||
via a callback provided by the service layer. This avoids wasteful SDK polling
|
||||
and makes results survive page refreshes.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from collections.abc import Awaitable, Callable
|
||||
from contextvars import ContextVar
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools import TOOL_REGISTRY
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Allowed base directory for the Read tool (SDK saves oversized tool results here).
|
||||
# Restricted to ~/.claude/projects/ and further validated to require "tool-results"
|
||||
# in the path — prevents reading settings, credentials, or other sensitive files.
|
||||
_SDK_PROJECTS_DIR = os.path.expanduser("~/.claude/projects/")
|
||||
|
||||
# MCP server naming - the SDK prefixes tool names as "mcp__{server_name}__{tool}"
|
||||
MCP_SERVER_NAME = "copilot"
|
||||
MCP_TOOL_PREFIX = f"mcp__{MCP_SERVER_NAME}__"
|
||||
|
||||
# Context variables to pass user/session info to tool execution
|
||||
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
|
||||
_current_session: ContextVar[ChatSession | None] = ContextVar(
|
||||
"current_session", default=None
|
||||
)
|
||||
# Stash for MCP tool outputs before the SDK potentially truncates them.
|
||||
# Keyed by tool_name → full output string. Consumed (popped) by the
|
||||
# response adapter when it builds StreamToolOutputAvailable.
|
||||
_pending_tool_outputs: ContextVar[dict[str, str]] = ContextVar(
|
||||
"pending_tool_outputs", default=None # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
# Callback type for delegating long-running tools to the non-SDK infrastructure.
|
||||
# Args: (tool_name, arguments, session) → MCP-formatted response dict.
|
||||
LongRunningCallback = Callable[
|
||||
[str, dict[str, Any], ChatSession], Awaitable[dict[str, Any]]
|
||||
]
|
||||
|
||||
# ContextVar so the service layer can inject the callback per-request.
|
||||
_long_running_callback: ContextVar[LongRunningCallback | None] = ContextVar(
|
||||
"long_running_callback", default=None
|
||||
)
|
||||
|
||||
|
||||
def set_execution_context(
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
long_running_callback: LongRunningCallback | None = None,
|
||||
) -> None:
|
||||
"""Set the execution context for tool calls.
|
||||
|
||||
This must be called before streaming begins to ensure tools have access
|
||||
to user_id and session information.
|
||||
|
||||
Args:
|
||||
user_id: Current user's ID.
|
||||
session: Current chat session.
|
||||
long_running_callback: Optional callback to delegate long-running tools
|
||||
to the non-SDK background infrastructure (stream_registry + Redis).
|
||||
"""
|
||||
_current_user_id.set(user_id)
|
||||
_current_session.set(session)
|
||||
_pending_tool_outputs.set({})
|
||||
_long_running_callback.set(long_running_callback)
|
||||
|
||||
|
||||
def get_execution_context() -> tuple[str | None, ChatSession | None]:
|
||||
"""Get the current execution context."""
|
||||
return (
|
||||
_current_user_id.get(),
|
||||
_current_session.get(),
|
||||
)
|
||||
|
||||
|
||||
def pop_pending_tool_output(tool_name: str) -> str | None:
|
||||
"""Pop and return the stashed full output for *tool_name*.
|
||||
|
||||
The SDK CLI may truncate large tool results (writing them to disk and
|
||||
replacing the content with a file reference). This stash keeps the
|
||||
original MCP output so the response adapter can forward it to the
|
||||
frontend for proper widget rendering.
|
||||
|
||||
Returns ``None`` if nothing was stashed for *tool_name*.
|
||||
"""
|
||||
pending = _pending_tool_outputs.get(None)
|
||||
if pending is None:
|
||||
return None
|
||||
return pending.pop(tool_name, None)
|
||||
|
||||
|
||||
async def _execute_tool_sync(
|
||||
base_tool: BaseTool,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
args: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
"""Execute a tool synchronously and return MCP-formatted response."""
|
||||
effective_id = f"sdk-{uuid.uuid4().hex[:12]}"
|
||||
result = await base_tool.execute(
|
||||
user_id=user_id,
|
||||
session=session,
|
||||
tool_call_id=effective_id,
|
||||
**args,
|
||||
)
|
||||
|
||||
text = (
|
||||
result.output if isinstance(result.output, str) else json.dumps(result.output)
|
||||
)
|
||||
|
||||
# Stash the full output before the SDK potentially truncates it.
|
||||
pending = _pending_tool_outputs.get(None)
|
||||
if pending is not None:
|
||||
pending[base_tool.name] = text
|
||||
|
||||
return {
|
||||
"content": [{"type": "text", "text": text}],
|
||||
"isError": not result.success,
|
||||
}
|
||||
|
||||
|
||||
def _mcp_error(message: str) -> dict[str, Any]:
|
||||
return {
|
||||
"content": [
|
||||
{"type": "text", "text": json.dumps({"error": message, "type": "error"})}
|
||||
],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
|
||||
def create_tool_handler(base_tool: BaseTool):
|
||||
"""Create an async handler function for a BaseTool.
|
||||
|
||||
This wraps the existing BaseTool._execute method to be compatible
|
||||
with the Claude Agent SDK MCP tool format.
|
||||
|
||||
Long-running tools (``is_long_running=True``) are delegated to the
|
||||
non-SDK background infrastructure via a callback set in the execution
|
||||
context. The callback persists the operation in Redis (stream_registry)
|
||||
so results survive page refreshes and pod restarts.
|
||||
"""
|
||||
|
||||
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Execute the wrapped tool and return MCP-formatted response."""
|
||||
user_id, session = get_execution_context()
|
||||
|
||||
if session is None:
|
||||
return _mcp_error("No session context available")
|
||||
|
||||
# --- Long-running: delegate to non-SDK background infrastructure ---
|
||||
if base_tool.is_long_running:
|
||||
callback = _long_running_callback.get(None)
|
||||
if callback:
|
||||
try:
|
||||
return await callback(base_tool.name, args, session)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Long-running callback failed for {base_tool.name}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
return _mcp_error(f"Failed to start {base_tool.name}: {e}")
|
||||
# No callback — fall through to synchronous execution
|
||||
logger.warning(
|
||||
f"[SDK] No long-running callback for {base_tool.name}, "
|
||||
f"executing synchronously (may block)"
|
||||
)
|
||||
|
||||
# --- Normal (fast) tool: execute synchronously ---
|
||||
try:
|
||||
return await _execute_tool_sync(base_tool, user_id, session, args)
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
|
||||
return _mcp_error(f"Failed to execute {base_tool.name}: {e}")
|
||||
|
||||
return tool_handler
|
||||
|
||||
|
||||
def _build_input_schema(base_tool: BaseTool) -> dict[str, Any]:
|
||||
"""Build a JSON Schema input schema for a tool."""
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": base_tool.parameters.get("properties", {}),
|
||||
"required": base_tool.parameters.get("required", []),
|
||||
}
|
||||
|
||||
|
||||
async def _read_file_handler(args: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Read a file with optional offset/limit. Restricted to SDK working directory.
|
||||
|
||||
After reading, the file is deleted to prevent accumulation in long-running pods.
|
||||
"""
|
||||
file_path = args.get("file_path", "")
|
||||
offset = args.get("offset", 0)
|
||||
limit = args.get("limit", 2000)
|
||||
|
||||
# Security: only allow reads under ~/.claude/projects/**/tool-results/
|
||||
real_path = os.path.realpath(file_path)
|
||||
if not real_path.startswith(_SDK_PROJECTS_DIR) or "tool-results" not in real_path:
|
||||
return {
|
||||
"content": [{"type": "text", "text": f"Access denied: {file_path}"}],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
try:
|
||||
with open(real_path) as f:
|
||||
lines = f.readlines()
|
||||
selected = lines[offset : offset + limit]
|
||||
content = "".join(selected)
|
||||
return {"content": [{"type": "text", "text": content}], "isError": False}
|
||||
except FileNotFoundError:
|
||||
return {
|
||||
"content": [{"type": "text", "text": f"File not found: {file_path}"}],
|
||||
"isError": True,
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"content": [{"type": "text", "text": f"Error reading file: {e}"}],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
|
||||
_READ_TOOL_NAME = "Read"
|
||||
_READ_TOOL_DESCRIPTION = (
|
||||
"Read a file from the local filesystem. "
|
||||
"Use offset and limit to read specific line ranges for large files."
|
||||
)
|
||||
_READ_TOOL_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"file_path": {
|
||||
"type": "string",
|
||||
"description": "The absolute path to the file to read",
|
||||
},
|
||||
"offset": {
|
||||
"type": "integer",
|
||||
"description": "Line number to start reading from (0-indexed). Default: 0",
|
||||
},
|
||||
"limit": {
|
||||
"type": "integer",
|
||||
"description": "Number of lines to read. Default: 2000",
|
||||
},
|
||||
},
|
||||
"required": ["file_path"],
|
||||
}
|
||||
|
||||
|
||||
# Create the MCP server configuration
|
||||
def create_copilot_mcp_server():
|
||||
"""Create an in-process MCP server configuration for CoPilot tools.
|
||||
|
||||
This can be passed to ClaudeAgentOptions.mcp_servers.
|
||||
|
||||
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
|
||||
package being available. This function returns the configuration that
|
||||
can be used with the SDK.
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import create_sdk_mcp_server, tool
|
||||
|
||||
# Create decorated tool functions
|
||||
sdk_tools = []
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
handler = create_tool_handler(base_tool)
|
||||
decorated = tool(
|
||||
tool_name,
|
||||
base_tool.description,
|
||||
_build_input_schema(base_tool),
|
||||
)(handler)
|
||||
sdk_tools.append(decorated)
|
||||
|
||||
# Add the Read tool so the SDK can read back oversized tool results
|
||||
read_tool = tool(
|
||||
_READ_TOOL_NAME,
|
||||
_READ_TOOL_DESCRIPTION,
|
||||
_READ_TOOL_SCHEMA,
|
||||
)(_read_file_handler)
|
||||
sdk_tools.append(read_tool)
|
||||
|
||||
server = create_sdk_mcp_server(
|
||||
name=MCP_SERVER_NAME,
|
||||
version="1.0.0",
|
||||
tools=sdk_tools,
|
||||
)
|
||||
|
||||
return server
|
||||
|
||||
except ImportError:
|
||||
# Let ImportError propagate so service.py handles the fallback
|
||||
raise
|
||||
|
||||
|
||||
# SDK built-in tools allowed within the workspace directory.
|
||||
# Security hooks validate that file paths stay within sdk_cwd.
|
||||
# Bash is NOT included — use the sandboxed MCP bash_exec tool instead,
|
||||
# which provides kernel-level network isolation via unshare --net.
|
||||
# Task allows spawning sub-agents (rate-limited by security hooks).
|
||||
_SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Task"]
|
||||
|
||||
# List of tool names for allowed_tools configuration
|
||||
# Include MCP tools, the MCP Read tool for oversized results,
|
||||
# and SDK built-in file tools for workspace operations.
|
||||
COPILOT_TOOL_NAMES = [
|
||||
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
|
||||
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
|
||||
*_SDK_BUILTIN_TOOLS,
|
||||
]
|
||||
@@ -245,16 +245,12 @@ async def _get_system_prompt_template(context: str) -> str:
|
||||
return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
|
||||
|
||||
|
||||
async def _build_system_prompt(
|
||||
user_id: str | None, has_conversation_history: bool = False
|
||||
) -> tuple[str, Any]:
|
||||
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
|
||||
"""Build the full system prompt including business understanding if available.
|
||||
|
||||
Args:
|
||||
user_id: The user ID for fetching business understanding.
|
||||
has_conversation_history: Whether there's existing conversation history.
|
||||
If True, we don't tell the model to greet/introduce (since they're
|
||||
already in a conversation).
|
||||
user_id: The user ID for fetching business understanding
|
||||
If "default" and this is the user's first session, will use "onboarding" instead.
|
||||
|
||||
Returns:
|
||||
Tuple of (compiled prompt string, business understanding object)
|
||||
@@ -270,8 +266,6 @@ async def _build_system_prompt(
|
||||
|
||||
if understanding:
|
||||
context = format_understanding_for_prompt(understanding)
|
||||
elif has_conversation_history:
|
||||
context = "No prior understanding saved yet. Continue the existing conversation naturally."
|
||||
else:
|
||||
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
|
||||
|
||||
@@ -380,6 +374,7 @@ async def stream_chat_completion(
|
||||
|
||||
Raises:
|
||||
NotFoundError: If session_id is invalid
|
||||
ValueError: If max_context_messages is exceeded
|
||||
|
||||
"""
|
||||
completion_start = time.monotonic()
|
||||
@@ -464,9 +459,8 @@ async def stream_chat_completion(
|
||||
|
||||
# Generate title for new sessions on first user message (non-blocking)
|
||||
# Check: is_user_message, no title yet, and this is the first user message
|
||||
user_messages = [m for m in session.messages if m.role == "user"]
|
||||
first_user_msg = message or (user_messages[0].content if user_messages else None)
|
||||
if is_user_message and first_user_msg and not session.title:
|
||||
if is_user_message and message and not session.title:
|
||||
user_messages = [m for m in session.messages if m.role == "user"]
|
||||
if len(user_messages) == 1:
|
||||
# First user message - generate title in background
|
||||
import asyncio
|
||||
@@ -474,7 +468,7 @@ async def stream_chat_completion(
|
||||
# Capture only the values we need (not the session object) to avoid
|
||||
# stale data issues when the main flow modifies the session
|
||||
captured_session_id = session_id
|
||||
captured_message = first_user_msg
|
||||
captured_message = message
|
||||
captured_user_id = user_id
|
||||
|
||||
async def _update_title():
|
||||
@@ -1243,7 +1237,7 @@ async def _stream_chat_chunks(
|
||||
|
||||
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time / 1000:.1f}s; "
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; "
|
||||
f"session={session.session_id}, user={session.user_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
|
||||
@@ -814,28 +814,6 @@ async def get_active_task_for_session(
|
||||
if task_user_id and user_id != task_user_id:
|
||||
continue
|
||||
|
||||
# Auto-expire stale tasks that exceeded stream_timeout
|
||||
created_at_str = meta.get("created_at", "")
|
||||
if created_at_str:
|
||||
try:
|
||||
created_at = datetime.fromisoformat(created_at_str)
|
||||
age_seconds = (
|
||||
datetime.now(timezone.utc) - created_at
|
||||
).total_seconds()
|
||||
if age_seconds > config.stream_timeout:
|
||||
logger.warning(
|
||||
f"[TASK_LOOKUP] Auto-expiring stale task {task_id[:8]}... "
|
||||
f"(age={age_seconds:.0f}s > timeout={config.stream_timeout}s)"
|
||||
)
|
||||
await mark_task_completed(task_id, "failed")
|
||||
continue
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
logger.info(
|
||||
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
|
||||
)
|
||||
|
||||
# Get the last message ID from Redis Stream
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
last_id = "0-0"
|
||||
|
||||
@@ -9,8 +9,6 @@ from backend.api.features.chat.tracking import track_tool_called
|
||||
from .add_understanding import AddUnderstandingTool
|
||||
from .agent_output import AgentOutputTool
|
||||
from .base import BaseTool
|
||||
from .bash_exec import BashExecTool
|
||||
from .check_operation_status import CheckOperationStatusTool
|
||||
from .create_agent import CreateAgentTool
|
||||
from .customize_agent import CustomizeAgentTool
|
||||
from .edit_agent import EditAgentTool
|
||||
@@ -21,7 +19,6 @@ from .get_doc_page import GetDocPageTool
|
||||
from .run_agent import RunAgentTool
|
||||
from .run_block import RunBlockTool
|
||||
from .search_docs import SearchDocsTool
|
||||
from .web_fetch import WebFetchTool
|
||||
from .workspace_files import (
|
||||
DeleteWorkspaceFileTool,
|
||||
ListWorkspaceFilesTool,
|
||||
@@ -46,14 +43,9 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
|
||||
"run_agent": RunAgentTool(),
|
||||
"run_block": RunBlockTool(),
|
||||
"view_agent_output": AgentOutputTool(),
|
||||
"check_operation_status": CheckOperationStatusTool(),
|
||||
"search_docs": SearchDocsTool(),
|
||||
"get_doc_page": GetDocPageTool(),
|
||||
# Web fetch for safe URL retrieval
|
||||
"web_fetch": WebFetchTool(),
|
||||
# Sandboxed code execution (bubblewrap)
|
||||
"bash_exec": BashExecTool(),
|
||||
# Persistent workspace tools (cloud storage, survives across sessions)
|
||||
# Workspace tools for CoPilot file operations
|
||||
"list_workspace_files": ListWorkspaceFilesTool(),
|
||||
"read_workspace_file": ReadWorkspaceFileTool(),
|
||||
"write_workspace_file": WriteWorkspaceFileTool(),
|
||||
|
||||
@@ -1,131 +0,0 @@
|
||||
"""Bash execution tool — run shell commands in a bubblewrap sandbox.
|
||||
|
||||
Full Bash scripting is allowed (loops, conditionals, pipes, functions, etc.).
|
||||
Safety comes from OS-level isolation (bubblewrap): only system dirs visible
|
||||
read-only, writable workspace only, clean env, no network.
|
||||
|
||||
Requires bubblewrap (``bwrap``) — the tool is disabled when bwrap is not
|
||||
available (e.g. macOS development).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
from backend.api.features.chat.tools.models import (
|
||||
BashExecResponse,
|
||||
ErrorResponse,
|
||||
ToolResponseBase,
|
||||
)
|
||||
from backend.api.features.chat.tools.sandbox import (
|
||||
get_workspace_dir,
|
||||
has_full_sandbox,
|
||||
run_sandboxed,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BashExecTool(BaseTool):
|
||||
"""Execute Bash commands in a bubblewrap sandbox."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "bash_exec"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
if not has_full_sandbox():
|
||||
return (
|
||||
"Bash execution is DISABLED — bubblewrap sandbox is not "
|
||||
"available on this platform. Do not call this tool."
|
||||
)
|
||||
return (
|
||||
"Execute a Bash command or script in a bubblewrap sandbox. "
|
||||
"Full Bash scripting is supported (loops, conditionals, pipes, "
|
||||
"functions, etc.). "
|
||||
"The sandbox shares the same working directory as the SDK Read/Write "
|
||||
"tools — files created by either are accessible to both. "
|
||||
"SECURITY: Only system directories (/usr, /bin, /lib, /etc) are "
|
||||
"visible read-only, the per-session workspace is the only writable "
|
||||
"path, environment variables are wiped (no secrets), all network "
|
||||
"access is blocked at the kernel level, and resource limits are "
|
||||
"enforced (max 64 processes, 512MB memory, 50MB file size). "
|
||||
"Application code, configs, and other directories are NOT accessible. "
|
||||
"To fetch web content, use the web_fetch tool instead. "
|
||||
"Execution is killed after the timeout (default 30s, max 120s). "
|
||||
"Returns stdout and stderr. "
|
||||
"Useful for file manipulation, data processing with Unix tools "
|
||||
"(grep, awk, sed, jq, etc.), and running shell scripts."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {
|
||||
"type": "string",
|
||||
"description": "Bash command or script to execute.",
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": (
|
||||
"Max execution time in seconds (default 30, max 120)."
|
||||
),
|
||||
"default": 30,
|
||||
},
|
||||
},
|
||||
"required": ["command"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
session_id = session.session_id if session else None
|
||||
|
||||
if not has_full_sandbox():
|
||||
return ErrorResponse(
|
||||
message="bash_exec requires bubblewrap sandbox (Linux only).",
|
||||
error="sandbox_unavailable",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
command: str = (kwargs.get("command") or "").strip()
|
||||
timeout: int = kwargs.get("timeout", 30)
|
||||
|
||||
if not command:
|
||||
return ErrorResponse(
|
||||
message="No command provided.",
|
||||
error="empty_command",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
workspace = get_workspace_dir(session_id or "default")
|
||||
|
||||
stdout, stderr, exit_code, timed_out = await run_sandboxed(
|
||||
command=["bash", "-c", command],
|
||||
cwd=workspace,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return BashExecResponse(
|
||||
message=(
|
||||
"Execution timed out"
|
||||
if timed_out
|
||||
else f"Command executed (exit {exit_code})"
|
||||
),
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
exit_code=exit_code,
|
||||
timed_out=timed_out,
|
||||
session_id=session_id,
|
||||
)
|
||||
@@ -1,127 +0,0 @@
|
||||
"""CheckOperationStatusTool — query the status of a long-running operation."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
from backend.api.features.chat.tools.models import (
|
||||
ErrorResponse,
|
||||
ResponseType,
|
||||
ToolResponseBase,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OperationStatusResponse(ToolResponseBase):
|
||||
"""Response for check_operation_status tool."""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_STATUS
|
||||
task_id: str
|
||||
operation_id: str
|
||||
status: str # "running", "completed", "failed"
|
||||
tool_name: str | None = None
|
||||
message: str = ""
|
||||
|
||||
|
||||
class CheckOperationStatusTool(BaseTool):
|
||||
"""Check the status of a long-running operation (create_agent, edit_agent, etc.).
|
||||
|
||||
The CoPilot uses this tool to report back to the user whether an
|
||||
operation that was started earlier has completed, failed, or is still
|
||||
running.
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "check_operation_status"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Check the current status of a long-running operation such as "
|
||||
"create_agent or edit_agent. Accepts either an operation_id or "
|
||||
"task_id from a previous operation_started response. "
|
||||
"Returns the current status: running, completed, or failed."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"operation_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The operation_id from an operation_started response."
|
||||
),
|
||||
},
|
||||
"task_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The task_id from an operation_started response. "
|
||||
"Used as fallback if operation_id is not provided."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": [],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs,
|
||||
) -> ToolResponseBase:
|
||||
from backend.api.features.chat import stream_registry
|
||||
|
||||
operation_id: str = kwargs.get("operation_id", "").strip()
|
||||
task_id: str = kwargs.get("task_id", "").strip()
|
||||
|
||||
if not operation_id and not task_id:
|
||||
return ErrorResponse(
|
||||
message="Please provide an operation_id or task_id.",
|
||||
error="missing_parameter",
|
||||
)
|
||||
|
||||
task = None
|
||||
if operation_id:
|
||||
task = await stream_registry.find_task_by_operation_id(operation_id)
|
||||
if task is None and task_id:
|
||||
task = await stream_registry.get_task(task_id)
|
||||
|
||||
if task is None:
|
||||
# Task not in Redis — it may have already expired (TTL).
|
||||
# Check conversation history for the result instead.
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
"Operation not found — it may have already completed and "
|
||||
"expired from the status tracker. Check the conversation "
|
||||
"history for the result."
|
||||
),
|
||||
error="not_found",
|
||||
)
|
||||
|
||||
status_messages = {
|
||||
"running": (
|
||||
f"The {task.tool_name or 'operation'} is still running. "
|
||||
"Please wait for it to complete."
|
||||
),
|
||||
"completed": (
|
||||
f"The {task.tool_name or 'operation'} has completed successfully."
|
||||
),
|
||||
"failed": f"The {task.tool_name or 'operation'} has failed.",
|
||||
}
|
||||
|
||||
return OperationStatusResponse(
|
||||
task_id=task.task_id,
|
||||
operation_id=task.operation_id,
|
||||
status=task.status,
|
||||
tool_name=task.tool_name,
|
||||
message=status_messages.get(task.status, f"Status: {task.status}"),
|
||||
)
|
||||
@@ -13,8 +13,7 @@ from backend.api.features.chat.tools.models import (
|
||||
NoResultsResponse,
|
||||
)
|
||||
from backend.api.features.store.hybrid_search import unified_hybrid_search
|
||||
from backend.blocks import get_block
|
||||
from backend.blocks._base import BlockType
|
||||
from backend.data.block import BlockType, get_block
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ from backend.api.features.chat.tools.find_block import (
|
||||
FindBlockTool,
|
||||
)
|
||||
from backend.api.features.chat.tools.models import BlockListResponse
|
||||
from backend.blocks._base import BlockType
|
||||
from backend.data.block import BlockType
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
|
||||
@@ -40,12 +40,6 @@ class ResponseType(str, Enum):
|
||||
OPERATION_IN_PROGRESS = "operation_in_progress"
|
||||
# Input validation
|
||||
INPUT_VALIDATION_ERROR = "input_validation_error"
|
||||
# Web fetch
|
||||
WEB_FETCH = "web_fetch"
|
||||
# Code execution
|
||||
BASH_EXEC = "bash_exec"
|
||||
# Operation status check
|
||||
OPERATION_STATUS = "operation_status"
|
||||
|
||||
|
||||
# Base response model
|
||||
@@ -341,17 +335,11 @@ class BlockInfoSummary(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
categories: list[str]
|
||||
input_schema: dict[str, Any] = Field(
|
||||
default_factory=dict,
|
||||
description="Full JSON schema for block inputs",
|
||||
)
|
||||
output_schema: dict[str, Any] = Field(
|
||||
default_factory=dict,
|
||||
description="Full JSON schema for block outputs",
|
||||
)
|
||||
input_schema: dict[str, Any]
|
||||
output_schema: dict[str, Any]
|
||||
required_inputs: list[BlockInputFieldInfo] = Field(
|
||||
default_factory=list,
|
||||
description="List of input fields for this block",
|
||||
description="List of required input fields for this block",
|
||||
)
|
||||
|
||||
|
||||
@@ -364,7 +352,7 @@ class BlockListResponse(ToolResponseBase):
|
||||
query: str
|
||||
usage_hint: str = Field(
|
||||
default="To execute a block, call run_block with block_id set to the block's "
|
||||
"'id' field and input_data containing the fields listed in required_inputs."
|
||||
"'id' field and input_data containing the required fields from input_schema."
|
||||
)
|
||||
|
||||
|
||||
@@ -433,24 +421,3 @@ class AsyncProcessingResponse(ToolResponseBase):
|
||||
status: str = "accepted" # Must be "accepted" for detection
|
||||
operation_id: str | None = None
|
||||
task_id: str | None = None
|
||||
|
||||
|
||||
class WebFetchResponse(ToolResponseBase):
|
||||
"""Response for web_fetch tool."""
|
||||
|
||||
type: ResponseType = ResponseType.WEB_FETCH
|
||||
url: str
|
||||
status_code: int
|
||||
content_type: str
|
||||
content: str
|
||||
truncated: bool = False
|
||||
|
||||
|
||||
class BashExecResponse(ToolResponseBase):
|
||||
"""Response for bash_exec tool."""
|
||||
|
||||
type: ResponseType = ResponseType.BASH_EXEC
|
||||
stdout: str
|
||||
stderr: str
|
||||
exit_code: int
|
||||
timed_out: bool = False
|
||||
|
||||
@@ -12,8 +12,7 @@ from backend.api.features.chat.tools.find_block import (
|
||||
COPILOT_EXCLUDED_BLOCK_IDS,
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES,
|
||||
)
|
||||
from backend.blocks import get_block
|
||||
from backend.blocks._base import AnyBlockSchema
|
||||
from backend.data.block import AnyBlockSchema, get_block
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
|
||||
@@ -6,7 +6,7 @@ import pytest
|
||||
|
||||
from backend.api.features.chat.tools.models import ErrorResponse
|
||||
from backend.api.features.chat.tools.run_block import RunBlockTool
|
||||
from backend.blocks._base import BlockType
|
||||
from backend.data.block import BlockType
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
|
||||
@@ -1,267 +0,0 @@
|
||||
"""Sandbox execution utilities for code execution tools.
|
||||
|
||||
Provides filesystem + network isolated command execution using **bubblewrap**
|
||||
(``bwrap``): whitelist-only filesystem (only system dirs visible read-only),
|
||||
writable workspace only, clean environment, network blocked.
|
||||
|
||||
Tools that call :func:`run_sandboxed` must first check :func:`has_full_sandbox`
|
||||
and refuse to run if bubblewrap is not available.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Output limits — prevent blowing up LLM context
|
||||
_MAX_OUTPUT_CHARS = 50_000
|
||||
_DEFAULT_TIMEOUT = 30
|
||||
_MAX_TIMEOUT = 120
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sandbox capability detection (cached at first call)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_BWRAP_AVAILABLE: bool | None = None
|
||||
|
||||
|
||||
def has_full_sandbox() -> bool:
|
||||
"""Return True if bubblewrap is available (filesystem + network isolation).
|
||||
|
||||
On non-Linux platforms (macOS), always returns False.
|
||||
"""
|
||||
global _BWRAP_AVAILABLE
|
||||
if _BWRAP_AVAILABLE is None:
|
||||
_BWRAP_AVAILABLE = (
|
||||
platform.system() == "Linux" and shutil.which("bwrap") is not None
|
||||
)
|
||||
return _BWRAP_AVAILABLE
|
||||
|
||||
|
||||
WORKSPACE_PREFIX = "/tmp/copilot-"
|
||||
|
||||
|
||||
def make_session_path(session_id: str) -> str:
|
||||
"""Build a sanitized, session-specific path under :data:`WORKSPACE_PREFIX`.
|
||||
|
||||
Shared by both the SDK working-directory setup and the sandbox tools so
|
||||
they always resolve to the same directory for a given session.
|
||||
|
||||
Steps:
|
||||
1. Strip all characters except ``[A-Za-z0-9-]``.
|
||||
2. Construct ``/tmp/copilot-<safe_id>``.
|
||||
3. Validate via ``os.path.normpath`` + ``startswith`` (CodeQL-recognised
|
||||
sanitizer) to prevent path traversal.
|
||||
|
||||
Raises:
|
||||
ValueError: If the resulting path escapes the prefix.
|
||||
"""
|
||||
import re
|
||||
|
||||
safe_id = re.sub(r"[^A-Za-z0-9-]", "", session_id)
|
||||
if not safe_id:
|
||||
safe_id = "default"
|
||||
path = os.path.normpath(f"{WORKSPACE_PREFIX}{safe_id}")
|
||||
if not path.startswith(WORKSPACE_PREFIX):
|
||||
raise ValueError(f"Session path escaped prefix: {path}")
|
||||
return path
|
||||
|
||||
|
||||
def get_workspace_dir(session_id: str) -> str:
|
||||
"""Get or create the workspace directory for a session.
|
||||
|
||||
Uses :func:`make_session_path` — the same path the SDK uses — so that
|
||||
bash_exec shares the workspace with the SDK file tools.
|
||||
"""
|
||||
workspace = make_session_path(session_id)
|
||||
os.makedirs(workspace, exist_ok=True)
|
||||
return workspace
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bubblewrap command builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# System directories mounted read-only inside the sandbox.
|
||||
# ONLY these are visible — /app, /root, /home, /opt, /var etc. are NOT accessible.
|
||||
_SYSTEM_RO_BINDS = [
|
||||
"/usr", # binaries, libraries, Python interpreter
|
||||
"/etc", # system config: ld.so, locale, passwd, alternatives
|
||||
]
|
||||
|
||||
# Compat paths: symlinks to /usr/* on modern Debian, real dirs on older systems.
|
||||
# On Debian 13 these are symlinks (e.g. /bin -> usr/bin). bwrap --ro-bind
|
||||
# can't create a symlink target, so we detect and use --symlink instead.
|
||||
# /lib64 is critical: the ELF dynamic linker lives at /lib64/ld-linux-x86-64.so.2.
|
||||
_COMPAT_PATHS = [
|
||||
("/bin", "usr/bin"), # -> /usr/bin on Debian 13
|
||||
("/sbin", "usr/sbin"), # -> /usr/sbin on Debian 13
|
||||
("/lib", "usr/lib"), # -> /usr/lib on Debian 13
|
||||
("/lib64", "usr/lib64"), # 64-bit libraries / ELF interpreter
|
||||
]
|
||||
|
||||
# Resource limits to prevent fork bombs, memory exhaustion, and disk abuse.
|
||||
# Applied via ulimit inside the sandbox before exec'ing the user command.
|
||||
_RESOURCE_LIMITS = (
|
||||
"ulimit -u 64" # max 64 processes (prevents fork bombs)
|
||||
" -v 524288" # 512 MB virtual memory
|
||||
" -f 51200" # 50 MB max file size (1024-byte blocks)
|
||||
" -n 256" # 256 open file descriptors
|
||||
" 2>/dev/null"
|
||||
)
|
||||
|
||||
|
||||
def _build_bwrap_command(
|
||||
command: list[str], cwd: str, env: dict[str, str]
|
||||
) -> list[str]:
|
||||
"""Build a bubblewrap command with strict filesystem + network isolation.
|
||||
|
||||
Security model:
|
||||
- **Whitelist-only filesystem**: only system directories (``/usr``, ``/etc``,
|
||||
``/bin``, ``/lib``) are mounted read-only. Application code (``/app``),
|
||||
home directories, ``/var``, ``/opt``, etc. are NOT accessible at all.
|
||||
- **Writable workspace only**: the per-session workspace is the sole
|
||||
writable path.
|
||||
- **Clean environment**: ``--clearenv`` wipes all inherited env vars.
|
||||
Only the explicitly-passed safe env vars are set inside the sandbox.
|
||||
- **Network isolation**: ``--unshare-net`` blocks all network access.
|
||||
- **Resource limits**: ulimit caps on processes (64), memory (512MB),
|
||||
file size (50MB), and open FDs (256) to prevent fork bombs and abuse.
|
||||
- **New session**: prevents terminal control escape.
|
||||
- **Die with parent**: prevents orphaned sandbox processes.
|
||||
"""
|
||||
cmd = [
|
||||
"bwrap",
|
||||
# Create a new user namespace so bwrap can set up sandboxing
|
||||
# inside unprivileged Docker containers (no CAP_SYS_ADMIN needed).
|
||||
"--unshare-user",
|
||||
# Wipe all inherited environment variables (API keys, secrets, etc.)
|
||||
"--clearenv",
|
||||
]
|
||||
|
||||
# Set only the safe env vars inside the sandbox
|
||||
for key, value in env.items():
|
||||
cmd.extend(["--setenv", key, value])
|
||||
|
||||
# System directories: read-only
|
||||
for path in _SYSTEM_RO_BINDS:
|
||||
cmd.extend(["--ro-bind", path, path])
|
||||
|
||||
# Compat paths: use --symlink when host path is a symlink (Debian 13),
|
||||
# --ro-bind when it's a real directory (older distros).
|
||||
for path, symlink_target in _COMPAT_PATHS:
|
||||
if os.path.islink(path):
|
||||
cmd.extend(["--symlink", symlink_target, path])
|
||||
elif os.path.exists(path):
|
||||
cmd.extend(["--ro-bind", path, path])
|
||||
|
||||
# Wrap the user command with resource limits:
|
||||
# sh -c 'ulimit ...; exec "$@"' -- <original command>
|
||||
# `exec "$@"` replaces the shell so there's no extra process overhead,
|
||||
# and properly handles arguments with spaces.
|
||||
limited_command = [
|
||||
"sh",
|
||||
"-c",
|
||||
f'{_RESOURCE_LIMITS}; exec "$@"',
|
||||
"--",
|
||||
*command,
|
||||
]
|
||||
|
||||
cmd.extend(
|
||||
[
|
||||
# Fresh virtual filesystems
|
||||
"--dev",
|
||||
"/dev",
|
||||
"--proc",
|
||||
"/proc",
|
||||
"--tmpfs",
|
||||
"/tmp",
|
||||
# Workspace bind AFTER --tmpfs /tmp so it's visible through the tmpfs.
|
||||
# (workspace lives under /tmp/copilot-<session>)
|
||||
"--bind",
|
||||
cwd,
|
||||
cwd,
|
||||
# Isolation
|
||||
"--unshare-net",
|
||||
"--die-with-parent",
|
||||
"--new-session",
|
||||
"--chdir",
|
||||
cwd,
|
||||
"--",
|
||||
*limited_command,
|
||||
]
|
||||
)
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def run_sandboxed(
|
||||
command: list[str],
|
||||
cwd: str,
|
||||
timeout: int = _DEFAULT_TIMEOUT,
|
||||
env: dict[str, str] | None = None,
|
||||
) -> tuple[str, str, int, bool]:
|
||||
"""Run a command inside a bubblewrap sandbox.
|
||||
|
||||
Callers **must** check :func:`has_full_sandbox` before calling this
|
||||
function. If bubblewrap is not available, this function raises
|
||||
:class:`RuntimeError` rather than running unsandboxed.
|
||||
|
||||
Returns:
|
||||
(stdout, stderr, exit_code, timed_out)
|
||||
"""
|
||||
if not has_full_sandbox():
|
||||
raise RuntimeError(
|
||||
"run_sandboxed() requires bubblewrap but bwrap is not available. "
|
||||
"Callers must check has_full_sandbox() before calling this function."
|
||||
)
|
||||
|
||||
timeout = min(max(timeout, 1), _MAX_TIMEOUT)
|
||||
|
||||
safe_env = {
|
||||
"PATH": "/usr/local/bin:/usr/bin:/bin",
|
||||
"HOME": cwd,
|
||||
"TMPDIR": cwd,
|
||||
"LANG": "en_US.UTF-8",
|
||||
"PYTHONDONTWRITEBYTECODE": "1",
|
||||
"PYTHONIOENCODING": "utf-8",
|
||||
}
|
||||
if env:
|
||||
safe_env.update(env)
|
||||
|
||||
full_command = _build_bwrap_command(command, cwd, safe_env)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*full_command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=cwd,
|
||||
env=safe_env,
|
||||
)
|
||||
|
||||
try:
|
||||
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=timeout
|
||||
)
|
||||
stdout = stdout_bytes.decode("utf-8", errors="replace")[:_MAX_OUTPUT_CHARS]
|
||||
stderr = stderr_bytes.decode("utf-8", errors="replace")[:_MAX_OUTPUT_CHARS]
|
||||
return stdout, stderr, proc.returncode or 0, False
|
||||
except asyncio.TimeoutError:
|
||||
proc.kill()
|
||||
await proc.communicate()
|
||||
return "", f"Execution timed out after {timeout}s", -1, True
|
||||
|
||||
except RuntimeError:
|
||||
raise
|
||||
except Exception as e:
|
||||
return "", f"Sandbox error: {e}", -1, False
|
||||
@@ -1,156 +0,0 @@
|
||||
"""Web fetch tool — safely retrieve public web page content."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import html2text
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
from backend.api.features.chat.tools.models import (
|
||||
ErrorResponse,
|
||||
ToolResponseBase,
|
||||
WebFetchResponse,
|
||||
)
|
||||
from backend.util.request import Requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Limits
|
||||
_MAX_CONTENT_BYTES = 102_400 # 100 KB download cap
|
||||
_MAX_OUTPUT_CHARS = 50_000 # 50K char truncation for LLM context
|
||||
_REQUEST_TIMEOUT = aiohttp.ClientTimeout(total=15)
|
||||
|
||||
# Content types we'll read as text
|
||||
_TEXT_CONTENT_TYPES = {
|
||||
"text/html",
|
||||
"text/plain",
|
||||
"text/xml",
|
||||
"text/csv",
|
||||
"text/markdown",
|
||||
"application/json",
|
||||
"application/xml",
|
||||
"application/xhtml+xml",
|
||||
"application/rss+xml",
|
||||
"application/atom+xml",
|
||||
}
|
||||
|
||||
|
||||
def _is_text_content(content_type: str) -> bool:
|
||||
base = content_type.split(";")[0].strip().lower()
|
||||
return base in _TEXT_CONTENT_TYPES or base.startswith("text/")
|
||||
|
||||
|
||||
def _html_to_text(html: str) -> str:
|
||||
h = html2text.HTML2Text()
|
||||
h.ignore_links = False
|
||||
h.ignore_images = True
|
||||
h.body_width = 0
|
||||
return h.handle(html)
|
||||
|
||||
|
||||
class WebFetchTool(BaseTool):
|
||||
"""Safely fetch content from a public URL using SSRF-protected HTTP."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "web_fetch"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Fetch the content of a public web page by URL. "
|
||||
"Returns readable text extracted from HTML by default. "
|
||||
"Useful for reading documentation, articles, and API responses. "
|
||||
"Only supports HTTP/HTTPS GET requests to public URLs "
|
||||
"(private/internal network addresses are blocked)."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"url": {
|
||||
"type": "string",
|
||||
"description": "The public HTTP/HTTPS URL to fetch.",
|
||||
},
|
||||
"extract_text": {
|
||||
"type": "boolean",
|
||||
"description": (
|
||||
"If true (default), extract readable text from HTML. "
|
||||
"If false, return raw content."
|
||||
),
|
||||
"default": True,
|
||||
},
|
||||
},
|
||||
"required": ["url"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
url: str = (kwargs.get("url") or "").strip()
|
||||
extract_text: bool = kwargs.get("extract_text", True)
|
||||
session_id = session.session_id if session else None
|
||||
|
||||
if not url:
|
||||
return ErrorResponse(
|
||||
message="Please provide a URL to fetch.",
|
||||
error="missing_url",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
try:
|
||||
client = Requests(raise_for_status=False, retry_max_attempts=1)
|
||||
response = await client.get(url, timeout=_REQUEST_TIMEOUT)
|
||||
except ValueError as e:
|
||||
# validate_url raises ValueError for SSRF / blocked IPs
|
||||
return ErrorResponse(
|
||||
message=f"URL blocked: {e}",
|
||||
error="url_blocked",
|
||||
session_id=session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"[web_fetch] Request failed for {url}: {e}")
|
||||
return ErrorResponse(
|
||||
message=f"Failed to fetch URL: {e}",
|
||||
error="fetch_failed",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
content_type = response.headers.get("content-type", "")
|
||||
if not _is_text_content(content_type):
|
||||
return ErrorResponse(
|
||||
message=f"Non-text content type: {content_type.split(';')[0]}",
|
||||
error="unsupported_content_type",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
raw = response.content[:_MAX_CONTENT_BYTES]
|
||||
text = raw.decode("utf-8", errors="replace")
|
||||
|
||||
if extract_text and "html" in content_type.lower():
|
||||
text = _html_to_text(text)
|
||||
|
||||
truncated = len(text) > _MAX_OUTPUT_CHARS
|
||||
if truncated:
|
||||
text = text[:_MAX_OUTPUT_CHARS]
|
||||
|
||||
return WebFetchResponse(
|
||||
message=f"Fetched {url}" + (" (truncated)" if truncated else ""),
|
||||
url=response.url,
|
||||
status_code=response.status,
|
||||
content_type=content_type.split(";")[0].strip(),
|
||||
content=text,
|
||||
truncated=truncated,
|
||||
session_id=session_id,
|
||||
)
|
||||
@@ -88,9 +88,7 @@ class ListWorkspaceFilesTool(BaseTool):
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"List files in the user's persistent workspace (cloud storage). "
|
||||
"These files survive across sessions. "
|
||||
"For ephemeral session files, use the SDK Read/Glob tools instead. "
|
||||
"List files in the user's workspace. "
|
||||
"Returns file names, paths, sizes, and metadata. "
|
||||
"Optionally filter by path prefix."
|
||||
)
|
||||
@@ -206,9 +204,7 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Read a file from the user's persistent workspace (cloud storage). "
|
||||
"These files survive across sessions. "
|
||||
"For ephemeral session files, use the SDK Read tool instead. "
|
||||
"Read a file from the user's workspace. "
|
||||
"Specify either file_id or path to identify the file. "
|
||||
"For small text files, returns content directly. "
|
||||
"For large or binary files, returns metadata and a download URL. "
|
||||
@@ -382,9 +378,7 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Write or create a file in the user's persistent workspace (cloud storage). "
|
||||
"These files survive across sessions. "
|
||||
"For ephemeral session files, use the SDK Write tool instead. "
|
||||
"Write or create a file in the user's workspace. "
|
||||
"Provide the content as a base64-encoded string. "
|
||||
f"Maximum file size is {Config().max_file_size_mb}MB. "
|
||||
"Files are saved to the current session's folder by default. "
|
||||
@@ -529,7 +523,7 @@ class DeleteWorkspaceFileTool(BaseTool):
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Delete a file from the user's persistent workspace (cloud storage). "
|
||||
"Delete a file from the user's workspace. "
|
||||
"Specify either file_id or path to identify the file. "
|
||||
"Paths are scoped to the current session by default. "
|
||||
"Use /sessions/<session_id>/... for cross-session access."
|
||||
|
||||
@@ -12,11 +12,12 @@ import backend.api.features.store.image_gen as store_image_gen
|
||||
import backend.api.features.store.media as store_media
|
||||
import backend.data.graph as graph_db
|
||||
import backend.data.integrations as integrations_db
|
||||
from backend.data.block import BlockInput
|
||||
from backend.data.db import transaction
|
||||
from backend.data.execution import get_graph_execution
|
||||
from backend.data.graph import GraphSettings
|
||||
from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include
|
||||
from backend.data.model import CredentialsMetaInput, GraphInput
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.integrations.creds_manager import IntegrationCredentialsManager
|
||||
from backend.integrations.webhooks.graph_lifecycle_hooks import (
|
||||
on_graph_activate,
|
||||
@@ -1129,7 +1130,7 @@ async def create_preset_from_graph_execution(
|
||||
async def update_preset(
|
||||
user_id: str,
|
||||
preset_id: str,
|
||||
inputs: Optional[GraphInput] = None,
|
||||
inputs: Optional[BlockInput] = None,
|
||||
credentials: Optional[dict[str, CredentialsMetaInput]] = None,
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
|
||||
@@ -6,12 +6,9 @@ import prisma.enums
|
||||
import prisma.models
|
||||
import pydantic
|
||||
|
||||
from backend.data.block import BlockInput
|
||||
from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo
|
||||
from backend.data.model import (
|
||||
CredentialsMetaInput,
|
||||
GraphInput,
|
||||
is_credentials_field_name,
|
||||
)
|
||||
from backend.data.model import CredentialsMetaInput, is_credentials_field_name
|
||||
from backend.util.json import loads as json_loads
|
||||
from backend.util.models import Pagination
|
||||
|
||||
@@ -326,7 +323,7 @@ class LibraryAgentPresetCreatable(pydantic.BaseModel):
|
||||
graph_id: str
|
||||
graph_version: int
|
||||
|
||||
inputs: GraphInput
|
||||
inputs: BlockInput
|
||||
credentials: dict[str, CredentialsMetaInput]
|
||||
|
||||
name: str
|
||||
@@ -355,7 +352,7 @@ class LibraryAgentPresetUpdatable(pydantic.BaseModel):
|
||||
Request model used when updating a preset for a library agent.
|
||||
"""
|
||||
|
||||
inputs: Optional[GraphInput] = None
|
||||
inputs: Optional[BlockInput] = None
|
||||
credentials: Optional[dict[str, CredentialsMetaInput]] = None
|
||||
|
||||
name: Optional[str] = None
|
||||
@@ -398,7 +395,7 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable):
|
||||
"Webhook must be included in AgentPreset query when webhookId is set"
|
||||
)
|
||||
|
||||
input_data: GraphInput = {}
|
||||
input_data: BlockInput = {}
|
||||
input_credentials: dict[str, CredentialsMetaInput] = {}
|
||||
|
||||
for preset_input in preset.InputPresets:
|
||||
|
||||
@@ -5,8 +5,8 @@ from typing import Optional
|
||||
import aiohttp
|
||||
from fastapi import HTTPException
|
||||
|
||||
from backend.blocks import get_block
|
||||
from backend.data import graph as graph_db
|
||||
from backend.data.block import get_block
|
||||
from backend.util.settings import Settings
|
||||
|
||||
from .models import ApiResponse, ChatRequest, GraphData
|
||||
|
||||
@@ -152,7 +152,7 @@ class BlockHandler(ContentHandler):
|
||||
|
||||
async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
|
||||
"""Fetch blocks without embeddings."""
|
||||
from backend.blocks import get_blocks
|
||||
from backend.data.block import get_blocks
|
||||
|
||||
# Get all available blocks
|
||||
all_blocks = get_blocks()
|
||||
@@ -249,7 +249,7 @@ class BlockHandler(ContentHandler):
|
||||
|
||||
async def get_stats(self) -> dict[str, int]:
|
||||
"""Get statistics about block embedding coverage."""
|
||||
from backend.blocks import get_blocks
|
||||
from backend.data.block import get_blocks
|
||||
|
||||
all_blocks = get_blocks()
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ async def test_block_handler_get_missing_items(mocker):
|
||||
mock_existing = []
|
||||
|
||||
with patch(
|
||||
"backend.blocks.get_blocks",
|
||||
"backend.data.block.get_blocks",
|
||||
return_value=mock_blocks,
|
||||
):
|
||||
with patch(
|
||||
@@ -135,7 +135,7 @@ async def test_block_handler_get_stats(mocker):
|
||||
mock_embedded = [{"count": 2}]
|
||||
|
||||
with patch(
|
||||
"backend.blocks.get_blocks",
|
||||
"backend.data.block.get_blocks",
|
||||
return_value=mock_blocks,
|
||||
):
|
||||
with patch(
|
||||
@@ -327,7 +327,7 @@ async def test_block_handler_handles_missing_attributes():
|
||||
mock_blocks = {"block-minimal": mock_block_class}
|
||||
|
||||
with patch(
|
||||
"backend.blocks.get_blocks",
|
||||
"backend.data.block.get_blocks",
|
||||
return_value=mock_blocks,
|
||||
):
|
||||
with patch(
|
||||
@@ -360,7 +360,7 @@ async def test_block_handler_skips_failed_blocks():
|
||||
mock_blocks = {"good-block": good_block, "bad-block": bad_block}
|
||||
|
||||
with patch(
|
||||
"backend.blocks.get_blocks",
|
||||
"backend.data.block.get_blocks",
|
||||
return_value=mock_blocks,
|
||||
):
|
||||
with patch(
|
||||
|
||||
@@ -662,7 +662,7 @@ async def cleanup_orphaned_embeddings() -> dict[str, Any]:
|
||||
)
|
||||
current_ids = {row["id"] for row in valid_agents}
|
||||
elif content_type == ContentType.BLOCK:
|
||||
from backend.blocks import get_blocks
|
||||
from backend.data.block import get_blocks
|
||||
|
||||
current_ids = set(get_blocks().keys())
|
||||
elif content_type == ContentType.DOCUMENTATION:
|
||||
|
||||
@@ -7,6 +7,15 @@ from replicate.client import Client as ReplicateClient
|
||||
from replicate.exceptions import ReplicateError
|
||||
from replicate.helpers import FileOutput
|
||||
|
||||
from backend.blocks.ideogram import (
|
||||
AspectRatio,
|
||||
ColorPalettePreset,
|
||||
IdeogramModelBlock,
|
||||
IdeogramModelName,
|
||||
MagicPromptOption,
|
||||
StyleType,
|
||||
UpscaleOption,
|
||||
)
|
||||
from backend.data.graph import GraphBaseMeta
|
||||
from backend.data.model import CredentialsMetaInput, ProviderName
|
||||
from backend.integrations.credentials_store import ideogram_credentials
|
||||
@@ -41,16 +50,6 @@ async def generate_agent_image_v2(graph: GraphBaseMeta | AgentGraph) -> io.Bytes
|
||||
if not ideogram_credentials.api_key:
|
||||
raise ValueError("Missing Ideogram API key")
|
||||
|
||||
from backend.blocks.ideogram import (
|
||||
AspectRatio,
|
||||
ColorPalettePreset,
|
||||
IdeogramModelBlock,
|
||||
IdeogramModelName,
|
||||
MagicPromptOption,
|
||||
StyleType,
|
||||
UpscaleOption,
|
||||
)
|
||||
|
||||
name = graph.name
|
||||
description = f"{name} ({graph.description})" if graph.description else name
|
||||
|
||||
|
||||
@@ -40,11 +40,10 @@ from backend.api.model import (
|
||||
UpdateTimezoneRequest,
|
||||
UploadFileResponse,
|
||||
)
|
||||
from backend.blocks import get_block, get_blocks
|
||||
from backend.data import execution as execution_db
|
||||
from backend.data import graph as graph_db
|
||||
from backend.data.auth import api_key as api_key_db
|
||||
from backend.data.block import BlockInput, CompletedBlockOutput
|
||||
from backend.data.block import BlockInput, CompletedBlockOutput, get_block, get_blocks
|
||||
from backend.data.credit import (
|
||||
AutoTopUpConfig,
|
||||
RefundRequest,
|
||||
|
||||
@@ -3,19 +3,22 @@ import logging
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Sequence, Type, TypeVar
|
||||
from typing import TYPE_CHECKING, TypeVar
|
||||
|
||||
from backend.blocks._base import AnyBlockSchema, BlockType
|
||||
from backend.util.cache import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.data.block import Block
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@cached(ttl_seconds=3600)
|
||||
def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
|
||||
from backend.blocks._base import Block
|
||||
def load_all_blocks() -> dict[str, type["Block"]]:
|
||||
from backend.data.block import Block
|
||||
from backend.util.settings import Config
|
||||
|
||||
# Check if example blocks should be loaded from settings
|
||||
@@ -47,8 +50,8 @@ def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
|
||||
importlib.import_module(f".{module}", package=__name__)
|
||||
|
||||
# Load all Block instances from the available modules
|
||||
available_blocks: dict[str, type["AnyBlockSchema"]] = {}
|
||||
for block_cls in _all_subclasses(Block):
|
||||
available_blocks: dict[str, type["Block"]] = {}
|
||||
for block_cls in all_subclasses(Block):
|
||||
class_name = block_cls.__name__
|
||||
|
||||
if class_name.endswith("Base"):
|
||||
@@ -61,7 +64,7 @@ def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
|
||||
"please name the class with 'Base' at the end"
|
||||
)
|
||||
|
||||
block = block_cls() # pyright: ignore[reportAbstractUsage]
|
||||
block = block_cls.create()
|
||||
|
||||
if not isinstance(block.id, str) or len(block.id) != 36:
|
||||
raise ValueError(
|
||||
@@ -102,7 +105,7 @@ def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
|
||||
available_blocks[block.id] = block_cls
|
||||
|
||||
# Filter out blocks with incomplete auth configs, e.g. missing OAuth server secrets
|
||||
from ._utils import is_block_auth_configured
|
||||
from backend.data.block import is_block_auth_configured
|
||||
|
||||
filtered_blocks = {}
|
||||
for block_id, block_cls in available_blocks.items():
|
||||
@@ -112,48 +115,11 @@ def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
|
||||
return filtered_blocks
|
||||
|
||||
|
||||
def _all_subclasses(cls: type[T]) -> list[type[T]]:
|
||||
__all__ = ["load_all_blocks"]
|
||||
|
||||
|
||||
def all_subclasses(cls: type[T]) -> list[type[T]]:
|
||||
subclasses = cls.__subclasses__()
|
||||
for subclass in subclasses:
|
||||
subclasses += _all_subclasses(subclass)
|
||||
subclasses += all_subclasses(subclass)
|
||||
return subclasses
|
||||
|
||||
|
||||
# ============== Block access helper functions ============== #
|
||||
|
||||
|
||||
def get_blocks() -> dict[str, Type["AnyBlockSchema"]]:
|
||||
return load_all_blocks()
|
||||
|
||||
|
||||
# Note on the return type annotation: https://github.com/microsoft/pyright/issues/10281
|
||||
def get_block(block_id: str) -> "AnyBlockSchema | None":
|
||||
cls = get_blocks().get(block_id)
|
||||
return cls() if cls else None
|
||||
|
||||
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_webhook_block_ids() -> Sequence[str]:
|
||||
return [
|
||||
id
|
||||
for id, B in get_blocks().items()
|
||||
if B().block_type in (BlockType.WEBHOOK, BlockType.WEBHOOK_MANUAL)
|
||||
]
|
||||
|
||||
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_io_block_ids() -> Sequence[str]:
|
||||
return [
|
||||
id
|
||||
for id, B in get_blocks().items()
|
||||
if B().block_type in (BlockType.INPUT, BlockType.OUTPUT)
|
||||
]
|
||||
|
||||
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_human_in_the_loop_block_ids() -> Sequence[str]:
|
||||
return [
|
||||
id
|
||||
for id, B in get_blocks().items()
|
||||
if B().block_type == BlockType.HUMAN_IN_THE_LOOP
|
||||
]
|
||||
|
||||
@@ -1,739 +0,0 @@
|
||||
import inspect
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
ClassVar,
|
||||
Generic,
|
||||
Optional,
|
||||
Type,
|
||||
TypeAlias,
|
||||
TypeVar,
|
||||
cast,
|
||||
get_origin,
|
||||
)
|
||||
|
||||
import jsonref
|
||||
import jsonschema
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.block import BlockInput, BlockOutput, BlockOutputEntry
|
||||
from backend.data.model import (
|
||||
Credentials,
|
||||
CredentialsFieldInfo,
|
||||
CredentialsMetaInput,
|
||||
SchemaField,
|
||||
is_credentials_field_name,
|
||||
)
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util import json
|
||||
from backend.util.exceptions import (
|
||||
BlockError,
|
||||
BlockExecutionError,
|
||||
BlockInputError,
|
||||
BlockOutputError,
|
||||
BlockUnknownError,
|
||||
)
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import ContributorDetails, NodeExecutionStats
|
||||
|
||||
from ..data.graph import Link
|
||||
|
||||
app_config = Config()
|
||||
|
||||
|
||||
BlockTestOutput = BlockOutputEntry | tuple[str, Callable[[Any], bool]]
|
||||
|
||||
|
||||
class BlockType(Enum):
|
||||
STANDARD = "Standard"
|
||||
INPUT = "Input"
|
||||
OUTPUT = "Output"
|
||||
NOTE = "Note"
|
||||
WEBHOOK = "Webhook"
|
||||
WEBHOOK_MANUAL = "Webhook (manual)"
|
||||
AGENT = "Agent"
|
||||
AI = "AI"
|
||||
AYRSHARE = "Ayrshare"
|
||||
HUMAN_IN_THE_LOOP = "Human In The Loop"
|
||||
|
||||
|
||||
class BlockCategory(Enum):
|
||||
AI = "Block that leverages AI to perform a task."
|
||||
SOCIAL = "Block that interacts with social media platforms."
|
||||
TEXT = "Block that processes text data."
|
||||
SEARCH = "Block that searches or extracts information from the internet."
|
||||
BASIC = "Block that performs basic operations."
|
||||
INPUT = "Block that interacts with input of the graph."
|
||||
OUTPUT = "Block that interacts with output of the graph."
|
||||
LOGIC = "Programming logic to control the flow of your agent"
|
||||
COMMUNICATION = "Block that interacts with communication platforms."
|
||||
DEVELOPER_TOOLS = "Developer tools such as GitHub blocks."
|
||||
DATA = "Block that interacts with structured data."
|
||||
HARDWARE = "Block that interacts with hardware."
|
||||
AGENT = "Block that interacts with other agents."
|
||||
CRM = "Block that interacts with CRM services."
|
||||
SAFETY = (
|
||||
"Block that provides AI safety mechanisms such as detecting harmful content"
|
||||
)
|
||||
PRODUCTIVITY = "Block that helps with productivity"
|
||||
ISSUE_TRACKING = "Block that helps with issue tracking"
|
||||
MULTIMEDIA = "Block that interacts with multimedia content"
|
||||
MARKETING = "Block that helps with marketing"
|
||||
|
||||
def dict(self) -> dict[str, str]:
|
||||
return {"category": self.name, "description": self.value}
|
||||
|
||||
|
||||
class BlockCostType(str, Enum):
|
||||
RUN = "run" # cost X credits per run
|
||||
BYTE = "byte" # cost X credits per byte
|
||||
SECOND = "second" # cost X credits per second
|
||||
|
||||
|
||||
class BlockCost(BaseModel):
|
||||
cost_amount: int
|
||||
cost_filter: BlockInput
|
||||
cost_type: BlockCostType
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cost_amount: int,
|
||||
cost_type: BlockCostType = BlockCostType.RUN,
|
||||
cost_filter: Optional[BlockInput] = None,
|
||||
**data: Any,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
cost_amount=cost_amount,
|
||||
cost_filter=cost_filter or {},
|
||||
cost_type=cost_type,
|
||||
**data,
|
||||
)
|
||||
|
||||
|
||||
class BlockInfo(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
inputSchema: dict[str, Any]
|
||||
outputSchema: dict[str, Any]
|
||||
costs: list[BlockCost]
|
||||
description: str
|
||||
categories: list[dict[str, str]]
|
||||
contributors: list[dict[str, Any]]
|
||||
staticOutput: bool
|
||||
uiType: str
|
||||
|
||||
|
||||
class BlockSchema(BaseModel):
|
||||
cached_jsonschema: ClassVar[dict[str, Any]]
|
||||
|
||||
@classmethod
|
||||
def jsonschema(cls) -> dict[str, Any]:
|
||||
if cls.cached_jsonschema:
|
||||
return cls.cached_jsonschema
|
||||
|
||||
model = jsonref.replace_refs(cls.model_json_schema(), merge_props=True)
|
||||
|
||||
def ref_to_dict(obj):
|
||||
if isinstance(obj, dict):
|
||||
# OpenAPI <3.1 does not support sibling fields that has a $ref key
|
||||
# So sometimes, the schema has an "allOf"/"anyOf"/"oneOf" with 1 item.
|
||||
keys = {"allOf", "anyOf", "oneOf"}
|
||||
one_key = next((k for k in keys if k in obj and len(obj[k]) == 1), None)
|
||||
if one_key:
|
||||
obj.update(obj[one_key][0])
|
||||
|
||||
return {
|
||||
key: ref_to_dict(value)
|
||||
for key, value in obj.items()
|
||||
if not key.startswith("$") and key != one_key
|
||||
}
|
||||
elif isinstance(obj, list):
|
||||
return [ref_to_dict(item) for item in obj]
|
||||
|
||||
return obj
|
||||
|
||||
cls.cached_jsonschema = cast(dict[str, Any], ref_to_dict(model))
|
||||
|
||||
return cls.cached_jsonschema
|
||||
|
||||
@classmethod
|
||||
def validate_data(cls, data: BlockInput) -> str | None:
|
||||
return json.validate_with_jsonschema(
|
||||
schema=cls.jsonschema(),
|
||||
data={k: v for k, v in data.items() if v is not None},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_mismatch_error(cls, data: BlockInput) -> str | None:
|
||||
return cls.validate_data(data)
|
||||
|
||||
@classmethod
|
||||
def get_field_schema(cls, field_name: str) -> dict[str, Any]:
|
||||
model_schema = cls.jsonschema().get("properties", {})
|
||||
if not model_schema:
|
||||
raise ValueError(f"Invalid model schema {cls}")
|
||||
|
||||
property_schema = model_schema.get(field_name)
|
||||
if not property_schema:
|
||||
raise ValueError(f"Invalid property name {field_name}")
|
||||
|
||||
return property_schema
|
||||
|
||||
@classmethod
|
||||
def validate_field(cls, field_name: str, data: BlockInput) -> str | None:
|
||||
"""
|
||||
Validate the data against a specific property (one of the input/output name).
|
||||
Returns the validation error message if the data does not match the schema.
|
||||
"""
|
||||
try:
|
||||
property_schema = cls.get_field_schema(field_name)
|
||||
jsonschema.validate(json.to_dict(data), property_schema)
|
||||
return None
|
||||
except jsonschema.ValidationError as e:
|
||||
return str(e)
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls) -> set[str]:
|
||||
return set(cls.model_fields.keys())
|
||||
|
||||
@classmethod
|
||||
def get_required_fields(cls) -> set[str]:
|
||||
return {
|
||||
field
|
||||
for field, field_info in cls.model_fields.items()
|
||||
if field_info.is_required()
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def __pydantic_init_subclass__(cls, **kwargs):
|
||||
"""Validates the schema definition. Rules:
|
||||
- Fields with annotation `CredentialsMetaInput` MUST be
|
||||
named `credentials` or `*_credentials`
|
||||
- Fields named `credentials` or `*_credentials` MUST be
|
||||
of type `CredentialsMetaInput`
|
||||
"""
|
||||
super().__pydantic_init_subclass__(**kwargs)
|
||||
|
||||
# Reset cached JSON schema to prevent inheriting it from parent class
|
||||
cls.cached_jsonschema = {}
|
||||
|
||||
credentials_fields = cls.get_credentials_fields()
|
||||
|
||||
for field_name in cls.get_fields():
|
||||
if is_credentials_field_name(field_name):
|
||||
if field_name not in credentials_fields:
|
||||
raise TypeError(
|
||||
f"Credentials field '{field_name}' on {cls.__qualname__} "
|
||||
f"is not of type {CredentialsMetaInput.__name__}"
|
||||
)
|
||||
|
||||
CredentialsMetaInput.validate_credentials_field_schema(
|
||||
cls.get_field_schema(field_name), field_name
|
||||
)
|
||||
|
||||
elif field_name in credentials_fields:
|
||||
raise KeyError(
|
||||
f"Credentials field '{field_name}' on {cls.__qualname__} "
|
||||
"has invalid name: must be 'credentials' or *_credentials"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_credentials_fields(cls) -> dict[str, type[CredentialsMetaInput]]:
|
||||
return {
|
||||
field_name: info.annotation
|
||||
for field_name, info in cls.model_fields.items()
|
||||
if (
|
||||
inspect.isclass(info.annotation)
|
||||
and issubclass(
|
||||
get_origin(info.annotation) or info.annotation,
|
||||
CredentialsMetaInput,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_auto_credentials_fields(cls) -> dict[str, dict[str, Any]]:
|
||||
"""
|
||||
Get fields that have auto_credentials metadata (e.g., GoogleDriveFileInput).
|
||||
|
||||
Returns a dict mapping kwarg_name -> {field_name, auto_credentials_config}
|
||||
|
||||
Raises:
|
||||
ValueError: If multiple fields have the same kwarg_name, as this would
|
||||
cause silent overwriting and only the last field would be processed.
|
||||
"""
|
||||
result: dict[str, dict[str, Any]] = {}
|
||||
schema = cls.jsonschema()
|
||||
properties = schema.get("properties", {})
|
||||
|
||||
for field_name, field_schema in properties.items():
|
||||
auto_creds = field_schema.get("auto_credentials")
|
||||
if auto_creds:
|
||||
kwarg_name = auto_creds.get("kwarg_name", "credentials")
|
||||
if kwarg_name in result:
|
||||
raise ValueError(
|
||||
f"Duplicate auto_credentials kwarg_name '{kwarg_name}' "
|
||||
f"in fields '{result[kwarg_name]['field_name']}' and "
|
||||
f"'{field_name}' on {cls.__qualname__}"
|
||||
)
|
||||
result[kwarg_name] = {
|
||||
"field_name": field_name,
|
||||
"config": auto_creds,
|
||||
}
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def get_credentials_fields_info(cls) -> dict[str, CredentialsFieldInfo]:
|
||||
result = {}
|
||||
|
||||
# Regular credentials fields
|
||||
for field_name in cls.get_credentials_fields().keys():
|
||||
result[field_name] = CredentialsFieldInfo.model_validate(
|
||||
cls.get_field_schema(field_name), by_alias=True
|
||||
)
|
||||
|
||||
# Auto-generated credentials fields (from GoogleDriveFileInput etc.)
|
||||
for kwarg_name, info in cls.get_auto_credentials_fields().items():
|
||||
config = info["config"]
|
||||
# Build a schema-like dict that CredentialsFieldInfo can parse
|
||||
auto_schema = {
|
||||
"credentials_provider": [config.get("provider", "google")],
|
||||
"credentials_types": [config.get("type", "oauth2")],
|
||||
"credentials_scopes": config.get("scopes"),
|
||||
}
|
||||
result[kwarg_name] = CredentialsFieldInfo.model_validate(
|
||||
auto_schema, by_alias=True
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def get_input_defaults(cls, data: BlockInput) -> BlockInput:
|
||||
return data # Return as is, by default.
|
||||
|
||||
@classmethod
|
||||
def get_missing_links(cls, data: BlockInput, links: list["Link"]) -> set[str]:
|
||||
input_fields_from_nodes = {link.sink_name for link in links}
|
||||
return input_fields_from_nodes - set(data)
|
||||
|
||||
@classmethod
|
||||
def get_missing_input(cls, data: BlockInput) -> set[str]:
|
||||
return cls.get_required_fields() - set(data)
|
||||
|
||||
|
||||
class BlockSchemaInput(BlockSchema):
|
||||
"""
|
||||
Base schema class for block inputs.
|
||||
All block input schemas should extend this class for consistency.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class BlockSchemaOutput(BlockSchema):
|
||||
"""
|
||||
Base schema class for block outputs that includes a standard error field.
|
||||
All block output schemas should extend this class to ensure consistent error handling.
|
||||
"""
|
||||
|
||||
error: str = SchemaField(
|
||||
description="Error message if the operation failed", default=""
|
||||
)
|
||||
|
||||
|
||||
BlockSchemaInputType = TypeVar("BlockSchemaInputType", bound=BlockSchemaInput)
|
||||
BlockSchemaOutputType = TypeVar("BlockSchemaOutputType", bound=BlockSchemaOutput)
|
||||
|
||||
|
||||
class EmptyInputSchema(BlockSchemaInput):
|
||||
pass
|
||||
|
||||
|
||||
class EmptyOutputSchema(BlockSchemaOutput):
|
||||
pass
|
||||
|
||||
|
||||
# For backward compatibility - will be deprecated
|
||||
EmptySchema = EmptyOutputSchema
|
||||
|
||||
|
||||
# --8<-- [start:BlockWebhookConfig]
|
||||
class BlockManualWebhookConfig(BaseModel):
|
||||
"""
|
||||
Configuration model for webhook-triggered blocks on which
|
||||
the user has to manually set up the webhook at the provider.
|
||||
"""
|
||||
|
||||
provider: ProviderName
|
||||
"""The service provider that the webhook connects to"""
|
||||
|
||||
webhook_type: str
|
||||
"""
|
||||
Identifier for the webhook type. E.g. GitHub has repo and organization level hooks.
|
||||
|
||||
Only for use in the corresponding `WebhooksManager`.
|
||||
"""
|
||||
|
||||
event_filter_input: str = ""
|
||||
"""
|
||||
Name of the block's event filter input.
|
||||
Leave empty if the corresponding webhook doesn't have distinct event/payload types.
|
||||
"""
|
||||
|
||||
event_format: str = "{event}"
|
||||
"""
|
||||
Template string for the event(s) that a block instance subscribes to.
|
||||
Applied individually to each event selected in the event filter input.
|
||||
|
||||
Example: `"pull_request.{event}"` -> `"pull_request.opened"`
|
||||
"""
|
||||
|
||||
|
||||
class BlockWebhookConfig(BlockManualWebhookConfig):
|
||||
"""
|
||||
Configuration model for webhook-triggered blocks for which
|
||||
the webhook can be automatically set up through the provider's API.
|
||||
"""
|
||||
|
||||
resource_format: str
|
||||
"""
|
||||
Template string for the resource that a block instance subscribes to.
|
||||
Fields will be filled from the block's inputs (except `payload`).
|
||||
|
||||
Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented)
|
||||
|
||||
Only for use in the corresponding `WebhooksManager`.
|
||||
"""
|
||||
# --8<-- [end:BlockWebhookConfig]
|
||||
|
||||
|
||||
class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
|
||||
def __init__(
|
||||
self,
|
||||
id: str = "",
|
||||
description: str = "",
|
||||
contributors: list["ContributorDetails"] = [],
|
||||
categories: set[BlockCategory] | None = None,
|
||||
input_schema: Type[BlockSchemaInputType] = EmptyInputSchema,
|
||||
output_schema: Type[BlockSchemaOutputType] = EmptyOutputSchema,
|
||||
test_input: BlockInput | list[BlockInput] | None = None,
|
||||
test_output: BlockTestOutput | list[BlockTestOutput] | None = None,
|
||||
test_mock: dict[str, Any] | None = None,
|
||||
test_credentials: Optional[Credentials | dict[str, Credentials]] = None,
|
||||
disabled: bool = False,
|
||||
static_output: bool = False,
|
||||
block_type: BlockType = BlockType.STANDARD,
|
||||
webhook_config: Optional[BlockWebhookConfig | BlockManualWebhookConfig] = None,
|
||||
is_sensitive_action: bool = False,
|
||||
):
|
||||
"""
|
||||
Initialize the block with the given schema.
|
||||
|
||||
Args:
|
||||
id: The unique identifier for the block, this value will be persisted in the
|
||||
DB. So it should be a unique and constant across the application run.
|
||||
Use the UUID format for the ID.
|
||||
description: The description of the block, explaining what the block does.
|
||||
contributors: The list of contributors who contributed to the block.
|
||||
input_schema: The schema, defined as a Pydantic model, for the input data.
|
||||
output_schema: The schema, defined as a Pydantic model, for the output data.
|
||||
test_input: The list or single sample input data for the block, for testing.
|
||||
test_output: The list or single expected output if the test_input is run.
|
||||
test_mock: function names on the block implementation to mock on test run.
|
||||
disabled: If the block is disabled, it will not be available for execution.
|
||||
static_output: Whether the output links of the block are static by default.
|
||||
"""
|
||||
from backend.data.model import NodeExecutionStats
|
||||
|
||||
self.id = id
|
||||
self.input_schema = input_schema
|
||||
self.output_schema = output_schema
|
||||
self.test_input = test_input
|
||||
self.test_output = test_output
|
||||
self.test_mock = test_mock
|
||||
self.test_credentials = test_credentials
|
||||
self.description = description
|
||||
self.categories = categories or set()
|
||||
self.contributors = contributors or set()
|
||||
self.disabled = disabled
|
||||
self.static_output = static_output
|
||||
self.block_type = block_type
|
||||
self.webhook_config = webhook_config
|
||||
self.is_sensitive_action = is_sensitive_action
|
||||
self.execution_stats: "NodeExecutionStats" = NodeExecutionStats()
|
||||
|
||||
if self.webhook_config:
|
||||
if isinstance(self.webhook_config, BlockWebhookConfig):
|
||||
# Enforce presence of credentials field on auto-setup webhook blocks
|
||||
if not (cred_fields := self.input_schema.get_credentials_fields()):
|
||||
raise TypeError(
|
||||
"credentials field is required on auto-setup webhook blocks"
|
||||
)
|
||||
# Disallow multiple credentials inputs on webhook blocks
|
||||
elif len(cred_fields) > 1:
|
||||
raise ValueError(
|
||||
"Multiple credentials inputs not supported on webhook blocks"
|
||||
)
|
||||
|
||||
self.block_type = BlockType.WEBHOOK
|
||||
else:
|
||||
self.block_type = BlockType.WEBHOOK_MANUAL
|
||||
|
||||
# Enforce shape of webhook event filter, if present
|
||||
if self.webhook_config.event_filter_input:
|
||||
event_filter_field = self.input_schema.model_fields[
|
||||
self.webhook_config.event_filter_input
|
||||
]
|
||||
if not (
|
||||
isinstance(event_filter_field.annotation, type)
|
||||
and issubclass(event_filter_field.annotation, BaseModel)
|
||||
and all(
|
||||
field.annotation is bool
|
||||
for field in event_filter_field.annotation.model_fields.values()
|
||||
)
|
||||
):
|
||||
raise NotImplementedError(
|
||||
f"{self.name} has an invalid webhook event selector: "
|
||||
"field must be a BaseModel and all its fields must be boolean"
|
||||
)
|
||||
|
||||
# Enforce presence of 'payload' input
|
||||
if "payload" not in self.input_schema.model_fields:
|
||||
raise TypeError(
|
||||
f"{self.name} is webhook-triggered but has no 'payload' input"
|
||||
)
|
||||
|
||||
# Disable webhook-triggered block if webhook functionality not available
|
||||
if not app_config.platform_base_url:
|
||||
self.disabled = True
|
||||
|
||||
@abstractmethod
|
||||
async def run(self, input_data: BlockSchemaInputType, **kwargs) -> BlockOutput:
|
||||
"""
|
||||
Run the block with the given input data.
|
||||
Args:
|
||||
input_data: The input data with the structure of input_schema.
|
||||
|
||||
Kwargs: Currently 14/02/2025 these include
|
||||
graph_id: The ID of the graph.
|
||||
node_id: The ID of the node.
|
||||
graph_exec_id: The ID of the graph execution.
|
||||
node_exec_id: The ID of the node execution.
|
||||
user_id: The ID of the user.
|
||||
|
||||
Returns:
|
||||
A Generator that yields (output_name, output_data).
|
||||
output_name: One of the output name defined in Block's output_schema.
|
||||
output_data: The data for the output_name, matching the defined schema.
|
||||
"""
|
||||
# --- satisfy the type checker, never executed -------------
|
||||
if False: # noqa: SIM115
|
||||
yield "name", "value" # pyright: ignore[reportMissingYield]
|
||||
raise NotImplementedError(f"{self.name} does not implement the run method.")
|
||||
|
||||
async def run_once(
|
||||
self, input_data: BlockSchemaInputType, output: str, **kwargs
|
||||
) -> Any:
|
||||
async for item in self.run(input_data, **kwargs):
|
||||
name, data = item
|
||||
if name == output:
|
||||
return data
|
||||
raise ValueError(f"{self.name} did not produce any output for {output}")
|
||||
|
||||
def merge_stats(self, stats: "NodeExecutionStats") -> "NodeExecutionStats":
|
||||
self.execution_stats += stats
|
||||
return self.execution_stats
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.__class__.__name__
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"inputSchema": self.input_schema.jsonschema(),
|
||||
"outputSchema": self.output_schema.jsonschema(),
|
||||
"description": self.description,
|
||||
"categories": [category.dict() for category in self.categories],
|
||||
"contributors": [
|
||||
contributor.model_dump() for contributor in self.contributors
|
||||
],
|
||||
"staticOutput": self.static_output,
|
||||
"uiType": self.block_type.value,
|
||||
}
|
||||
|
||||
def get_info(self) -> BlockInfo:
|
||||
from backend.data.credit import get_block_cost
|
||||
|
||||
return BlockInfo(
|
||||
id=self.id,
|
||||
name=self.name,
|
||||
inputSchema=self.input_schema.jsonschema(),
|
||||
outputSchema=self.output_schema.jsonschema(),
|
||||
costs=get_block_cost(self),
|
||||
description=self.description,
|
||||
categories=[category.dict() for category in self.categories],
|
||||
contributors=[
|
||||
contributor.model_dump() for contributor in self.contributors
|
||||
],
|
||||
staticOutput=self.static_output,
|
||||
uiType=self.block_type.value,
|
||||
)
|
||||
|
||||
async def execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
|
||||
try:
|
||||
async for output_name, output_data in self._execute(input_data, **kwargs):
|
||||
yield output_name, output_data
|
||||
except Exception as ex:
|
||||
if isinstance(ex, BlockError):
|
||||
raise ex
|
||||
else:
|
||||
raise (
|
||||
BlockExecutionError
|
||||
if isinstance(ex, ValueError)
|
||||
else BlockUnknownError
|
||||
)(
|
||||
message=str(ex),
|
||||
block_name=self.name,
|
||||
block_id=self.id,
|
||||
) from ex
|
||||
|
||||
async def is_block_exec_need_review(
|
||||
self,
|
||||
input_data: BlockInput,
|
||||
*,
|
||||
user_id: str,
|
||||
node_id: str,
|
||||
node_exec_id: str,
|
||||
graph_exec_id: str,
|
||||
graph_id: str,
|
||||
graph_version: int,
|
||||
execution_context: "ExecutionContext",
|
||||
**kwargs,
|
||||
) -> tuple[bool, BlockInput]:
|
||||
"""
|
||||
Check if this block execution needs human review and handle the review process.
|
||||
|
||||
Returns:
|
||||
Tuple of (should_pause, input_data_to_use)
|
||||
- should_pause: True if execution should be paused for review
|
||||
- input_data_to_use: The input data to use (may be modified by reviewer)
|
||||
"""
|
||||
if not (
|
||||
self.is_sensitive_action and execution_context.sensitive_action_safe_mode
|
||||
):
|
||||
return False, input_data
|
||||
|
||||
from backend.blocks.helpers.review import HITLReviewHelper
|
||||
|
||||
# Handle the review request and get decision
|
||||
decision = await HITLReviewHelper.handle_review_decision(
|
||||
input_data=input_data,
|
||||
user_id=user_id,
|
||||
node_id=node_id,
|
||||
node_exec_id=node_exec_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
graph_id=graph_id,
|
||||
graph_version=graph_version,
|
||||
block_name=self.name,
|
||||
editable=True,
|
||||
)
|
||||
|
||||
if decision is None:
|
||||
# We're awaiting review - pause execution
|
||||
return True, input_data
|
||||
|
||||
if not decision.should_proceed:
|
||||
# Review was rejected, raise an error to stop execution
|
||||
raise BlockExecutionError(
|
||||
message=f"Block execution rejected by reviewer: {decision.message}",
|
||||
block_name=self.name,
|
||||
block_id=self.id,
|
||||
)
|
||||
|
||||
# Review was approved - use the potentially modified data
|
||||
# ReviewResult.data must be a dict for block inputs
|
||||
reviewed_data = decision.review_result.data
|
||||
if not isinstance(reviewed_data, dict):
|
||||
raise BlockExecutionError(
|
||||
message=f"Review data must be a dict for block input, got {type(reviewed_data).__name__}",
|
||||
block_name=self.name,
|
||||
block_id=self.id,
|
||||
)
|
||||
return False, reviewed_data
|
||||
|
||||
async def _execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
|
||||
# Check for review requirement only if running within a graph execution context
|
||||
# Direct block execution (e.g., from chat) skips the review process
|
||||
has_graph_context = all(
|
||||
key in kwargs
|
||||
for key in (
|
||||
"node_exec_id",
|
||||
"graph_exec_id",
|
||||
"graph_id",
|
||||
"execution_context",
|
||||
)
|
||||
)
|
||||
if has_graph_context:
|
||||
should_pause, input_data = await self.is_block_exec_need_review(
|
||||
input_data, **kwargs
|
||||
)
|
||||
if should_pause:
|
||||
return
|
||||
|
||||
# Validate the input data (original or reviewer-modified) once
|
||||
if error := self.input_schema.validate_data(input_data):
|
||||
raise BlockInputError(
|
||||
message=f"Unable to execute block with invalid input data: {error}",
|
||||
block_name=self.name,
|
||||
block_id=self.id,
|
||||
)
|
||||
|
||||
# Use the validated input data
|
||||
async for output_name, output_data in self.run(
|
||||
self.input_schema(**{k: v for k, v in input_data.items() if v is not None}),
|
||||
**kwargs,
|
||||
):
|
||||
if output_name == "error":
|
||||
raise BlockExecutionError(
|
||||
message=output_data, block_name=self.name, block_id=self.id
|
||||
)
|
||||
if self.block_type == BlockType.STANDARD and (
|
||||
error := self.output_schema.validate_field(output_name, output_data)
|
||||
):
|
||||
raise BlockOutputError(
|
||||
message=f"Block produced an invalid output data: {error}",
|
||||
block_name=self.name,
|
||||
block_id=self.id,
|
||||
)
|
||||
yield output_name, output_data
|
||||
|
||||
def is_triggered_by_event_type(
|
||||
self, trigger_config: dict[str, Any], event_type: str
|
||||
) -> bool:
|
||||
if not self.webhook_config:
|
||||
raise TypeError("This method can't be used on non-trigger blocks")
|
||||
if not self.webhook_config.event_filter_input:
|
||||
return True
|
||||
event_filter = trigger_config.get(self.webhook_config.event_filter_input)
|
||||
if not event_filter:
|
||||
raise ValueError("Event filter is not configured on trigger")
|
||||
return event_type in [
|
||||
self.webhook_config.event_format.format(event=k)
|
||||
for k in event_filter
|
||||
if event_filter[k] is True
|
||||
]
|
||||
|
||||
|
||||
# Type alias for any block with standard input/output schemas
|
||||
AnyBlockSchema: TypeAlias = Block[BlockSchemaInput, BlockSchemaOutput]
|
||||
@@ -1,122 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
from backend.integrations.providers import ProviderName
|
||||
|
||||
from ._base import AnyBlockSchema
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def is_block_auth_configured(
|
||||
block_cls: type[AnyBlockSchema],
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a block has a valid authentication method configured at runtime.
|
||||
|
||||
For example if a block is an OAuth-only block and there env vars are not set,
|
||||
do not show it in the UI.
|
||||
|
||||
"""
|
||||
from backend.sdk.registry import AutoRegistry
|
||||
|
||||
# Create an instance to access input_schema
|
||||
try:
|
||||
block = block_cls()
|
||||
except Exception as e:
|
||||
# If we can't create a block instance, assume it's not OAuth-only
|
||||
logger.error(f"Error creating block instance for {block_cls.__name__}: {e}")
|
||||
return True
|
||||
logger.debug(
|
||||
f"Checking if block {block_cls.__name__} has a valid provider configured"
|
||||
)
|
||||
|
||||
# Get all credential inputs from input schema
|
||||
credential_inputs = block.input_schema.get_credentials_fields_info()
|
||||
required_inputs = block.input_schema.get_required_fields()
|
||||
if not credential_inputs:
|
||||
logger.debug(
|
||||
f"Block {block_cls.__name__} has no credential inputs - Treating as valid"
|
||||
)
|
||||
return True
|
||||
|
||||
# Check credential inputs
|
||||
if len(required_inputs.intersection(credential_inputs.keys())) == 0:
|
||||
logger.debug(
|
||||
f"Block {block_cls.__name__} has only optional credential inputs"
|
||||
" - will work without credentials configured"
|
||||
)
|
||||
|
||||
# Check if the credential inputs for this block are correctly configured
|
||||
for field_name, field_info in credential_inputs.items():
|
||||
provider_names = field_info.provider
|
||||
if not provider_names:
|
||||
logger.warning(
|
||||
f"Block {block_cls.__name__} "
|
||||
f"has credential input '{field_name}' with no provider options"
|
||||
" - Disabling"
|
||||
)
|
||||
return False
|
||||
|
||||
# If a field has multiple possible providers, each one needs to be usable to
|
||||
# prevent breaking the UX
|
||||
for _provider_name in provider_names:
|
||||
provider_name = _provider_name.value
|
||||
if provider_name in ProviderName.__members__.values():
|
||||
logger.debug(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' "
|
||||
f"provider '{provider_name}' is part of the legacy provider system"
|
||||
" - Treating as valid"
|
||||
)
|
||||
break
|
||||
|
||||
provider = AutoRegistry.get_provider(provider_name)
|
||||
if not provider:
|
||||
logger.warning(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' "
|
||||
f"refers to unknown provider '{provider_name}' - Disabling"
|
||||
)
|
||||
return False
|
||||
|
||||
# Check the provider's supported auth types
|
||||
if field_info.supported_types != provider.supported_auth_types:
|
||||
logger.warning(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' "
|
||||
f"has mismatched supported auth types (field <> Provider): "
|
||||
f"{field_info.supported_types} != {provider.supported_auth_types}"
|
||||
)
|
||||
|
||||
if not (supported_auth_types := provider.supported_auth_types):
|
||||
# No auth methods are been configured for this provider
|
||||
logger.warning(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' "
|
||||
f"provider '{provider_name}' "
|
||||
"has no authentication methods configured - Disabling"
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if provider supports OAuth
|
||||
if "oauth2" in supported_auth_types:
|
||||
# Check if OAuth environment variables are set
|
||||
if (oauth_config := provider.oauth_config) and bool(
|
||||
os.getenv(oauth_config.client_id_env_var)
|
||||
and os.getenv(oauth_config.client_secret_env_var)
|
||||
):
|
||||
logger.debug(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' "
|
||||
f"provider '{provider_name}' is configured for OAuth"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' "
|
||||
f"provider '{provider_name}' "
|
||||
"is missing OAuth client ID or secret - Disabling"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
f"Block {block_cls.__name__} credential input '{field_name}' is valid; "
|
||||
f"supported credential types: {', '.join(field_info.supported_types)}"
|
||||
)
|
||||
|
||||
return True
|
||||
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockInput,
|
||||
@@ -9,15 +9,13 @@ from backend.blocks._base import (
|
||||
BlockSchema,
|
||||
BlockSchemaInput,
|
||||
BlockType,
|
||||
get_block,
|
||||
)
|
||||
from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks
|
||||
from backend.data.model import NodeExecutionStats, SchemaField
|
||||
from backend.util.json import validate_with_jsonschema
|
||||
from backend.util.retry import func_retry
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.executor.utils import LogMetadata
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -126,10 +124,9 @@ class AgentExecutorBlock(Block):
|
||||
graph_version: int,
|
||||
graph_exec_id: str,
|
||||
user_id: str,
|
||||
logger: "LogMetadata",
|
||||
logger,
|
||||
) -> BlockOutput:
|
||||
|
||||
from backend.blocks import get_block
|
||||
from backend.data.execution import ExecutionEventType
|
||||
from backend.executor import utils as execution_utils
|
||||
|
||||
@@ -201,7 +198,7 @@ class AgentExecutorBlock(Block):
|
||||
self,
|
||||
graph_exec_id: str,
|
||||
user_id: str,
|
||||
logger: "LogMetadata",
|
||||
logger,
|
||||
) -> None:
|
||||
from backend.executor import utils as execution_utils
|
||||
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
from typing import Any
|
||||
|
||||
from backend.blocks._base import (
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.llm import (
|
||||
DEFAULT_LLM_MODEL,
|
||||
TEST_CREDENTIALS,
|
||||
@@ -17,6 +11,12 @@ from backend.blocks.llm import (
|
||||
LLMResponse,
|
||||
llm_call,
|
||||
)
|
||||
from backend.data.block import (
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.model import APIKeyCredentials, NodeExecutionStats, SchemaField
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from pydantic import SecretStr
|
||||
from replicate.client import Client as ReplicateClient
|
||||
from replicate.helpers import FileOutput
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -5,12 +5,7 @@ from pydantic import SecretStr
|
||||
from replicate.client import Client as ReplicateClient
|
||||
from replicate.helpers import FileOutput
|
||||
|
||||
from backend.blocks._base import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.block import Block, BlockCategory, BlockSchemaInput, BlockSchemaOutput
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import (
|
||||
APIKeyCredentials,
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import Literal
|
||||
from pydantic import SecretStr
|
||||
from replicate.client import Client as ReplicateClient
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import Literal
|
||||
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
from backend.blocks._base import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.apollo._api import ApolloClient
|
||||
from backend.blocks.apollo._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
@@ -17,6 +10,13 @@ from backend.blocks.apollo.models import (
|
||||
PrimaryPhone,
|
||||
SearchOrganizationsRequest,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.model import CredentialsField, SchemaField
|
||||
|
||||
|
||||
|
||||
@@ -1,12 +1,5 @@
|
||||
import asyncio
|
||||
|
||||
from backend.blocks._base import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.apollo._api import ApolloClient
|
||||
from backend.blocks.apollo._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
@@ -21,6 +14,13 @@ from backend.blocks.apollo.models import (
|
||||
SearchPeopleRequest,
|
||||
SenorityLevels,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.model import CredentialsField, SchemaField
|
||||
|
||||
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
from backend.blocks._base import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.apollo._api import ApolloClient
|
||||
from backend.blocks.apollo._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
@@ -13,6 +6,13 @@ from backend.blocks.apollo._auth import (
|
||||
ApolloCredentialsInput,
|
||||
)
|
||||
from backend.blocks.apollo.models import Contact, EnrichPersonRequest
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.model import CredentialsField, SchemaField
|
||||
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from backend.blocks._base import BlockSchemaInput
|
||||
from backend.data.block import BlockSchemaInput
|
||||
from backend.data.model import SchemaField, UserIntegrations
|
||||
from backend.integrations.ayrshare import AyrshareClient
|
||||
from backend.util.clients import get_database_manager_async_client
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import enum
|
||||
from typing import Any
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
import re
|
||||
from typing import Type
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import json
|
||||
import shlex
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Literal, Optional
|
||||
from typing import Literal, Optional
|
||||
|
||||
from e2b import AsyncSandbox as BaseAsyncSandbox
|
||||
from pydantic import SecretStr
|
||||
from pydantic import BaseModel, SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
@@ -20,13 +20,6 @@ from backend.data.model import (
|
||||
SchemaField,
|
||||
)
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util.sandbox_files import (
|
||||
SandboxFileOutput,
|
||||
extract_and_store_sandbox_files,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.executor.utils import ExecutionContext
|
||||
|
||||
|
||||
class ClaudeCodeExecutionError(Exception):
|
||||
@@ -181,15 +174,22 @@ class ClaudeCodeBlock(Block):
|
||||
advanced=True,
|
||||
)
|
||||
|
||||
class FileOutput(BaseModel):
|
||||
"""A file extracted from the sandbox."""
|
||||
|
||||
path: str
|
||||
relative_path: str # Path relative to working directory (for GitHub, etc.)
|
||||
name: str
|
||||
content: str
|
||||
|
||||
class Output(BlockSchemaOutput):
|
||||
response: str = SchemaField(
|
||||
description="The output/response from Claude Code execution"
|
||||
)
|
||||
files: list[SandboxFileOutput] = SchemaField(
|
||||
files: list["ClaudeCodeBlock.FileOutput"] = SchemaField(
|
||||
description=(
|
||||
"List of text files created/modified by Claude Code during this execution. "
|
||||
"Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. "
|
||||
"workspace_ref contains a workspace:// URI if the file was stored to workspace."
|
||||
"Each file has 'path', 'relative_path', 'name', and 'content' fields."
|
||||
)
|
||||
)
|
||||
conversation_history: str = SchemaField(
|
||||
@@ -252,7 +252,6 @@ class ClaudeCodeBlock(Block):
|
||||
"relative_path": "index.html",
|
||||
"name": "index.html",
|
||||
"content": "<html>Hello World</html>",
|
||||
"workspace_ref": None,
|
||||
}
|
||||
],
|
||||
),
|
||||
@@ -268,12 +267,11 @@ class ClaudeCodeBlock(Block):
|
||||
"execute_claude_code": lambda *args, **kwargs: (
|
||||
"Created index.html with hello world content", # response
|
||||
[
|
||||
SandboxFileOutput(
|
||||
ClaudeCodeBlock.FileOutput(
|
||||
path="/home/user/index.html",
|
||||
relative_path="index.html",
|
||||
name="index.html",
|
||||
content="<html>Hello World</html>",
|
||||
workspace_ref=None,
|
||||
)
|
||||
], # files
|
||||
"User: Create a hello world HTML file\n"
|
||||
@@ -296,8 +294,7 @@ class ClaudeCodeBlock(Block):
|
||||
existing_sandbox_id: str,
|
||||
conversation_history: str,
|
||||
dispose_sandbox: bool,
|
||||
execution_context: "ExecutionContext",
|
||||
) -> tuple[str, list[SandboxFileOutput], str, str, str]:
|
||||
) -> tuple[str, list["ClaudeCodeBlock.FileOutput"], str, str, str]:
|
||||
"""
|
||||
Execute Claude Code in an E2B sandbox.
|
||||
|
||||
@@ -452,18 +449,14 @@ class ClaudeCodeBlock(Block):
|
||||
else:
|
||||
new_conversation_history = turn_entry
|
||||
|
||||
# Extract files created/modified during this run and store to workspace
|
||||
sandbox_files = await extract_and_store_sandbox_files(
|
||||
sandbox=sandbox,
|
||||
working_directory=working_directory,
|
||||
execution_context=execution_context,
|
||||
since_timestamp=start_timestamp,
|
||||
text_only=True,
|
||||
# Extract files created/modified during this run
|
||||
files = await self._extract_files(
|
||||
sandbox, working_directory, start_timestamp
|
||||
)
|
||||
|
||||
return (
|
||||
response,
|
||||
sandbox_files, # Already SandboxFileOutput objects
|
||||
files,
|
||||
new_conversation_history,
|
||||
current_session_id,
|
||||
sandbox_id,
|
||||
@@ -478,6 +471,140 @@ class ClaudeCodeBlock(Block):
|
||||
if dispose_sandbox and sandbox:
|
||||
await sandbox.kill()
|
||||
|
||||
async def _extract_files(
|
||||
self,
|
||||
sandbox: BaseAsyncSandbox,
|
||||
working_directory: str,
|
||||
since_timestamp: str | None = None,
|
||||
) -> list["ClaudeCodeBlock.FileOutput"]:
|
||||
"""
|
||||
Extract text files created/modified during this Claude Code execution.
|
||||
|
||||
Args:
|
||||
sandbox: The E2B sandbox instance
|
||||
working_directory: Directory to search for files
|
||||
since_timestamp: ISO timestamp - only return files modified after this time
|
||||
|
||||
Returns:
|
||||
List of FileOutput objects with path, relative_path, name, and content
|
||||
"""
|
||||
files: list[ClaudeCodeBlock.FileOutput] = []
|
||||
|
||||
# Text file extensions we can safely read as text
|
||||
text_extensions = {
|
||||
".txt",
|
||||
".md",
|
||||
".html",
|
||||
".htm",
|
||||
".css",
|
||||
".js",
|
||||
".ts",
|
||||
".jsx",
|
||||
".tsx",
|
||||
".json",
|
||||
".xml",
|
||||
".yaml",
|
||||
".yml",
|
||||
".toml",
|
||||
".ini",
|
||||
".cfg",
|
||||
".conf",
|
||||
".py",
|
||||
".rb",
|
||||
".php",
|
||||
".java",
|
||||
".c",
|
||||
".cpp",
|
||||
".h",
|
||||
".hpp",
|
||||
".cs",
|
||||
".go",
|
||||
".rs",
|
||||
".swift",
|
||||
".kt",
|
||||
".scala",
|
||||
".sh",
|
||||
".bash",
|
||||
".zsh",
|
||||
".sql",
|
||||
".graphql",
|
||||
".env",
|
||||
".gitignore",
|
||||
".dockerfile",
|
||||
"Dockerfile",
|
||||
".vue",
|
||||
".svelte",
|
||||
".astro",
|
||||
".mdx",
|
||||
".rst",
|
||||
".tex",
|
||||
".csv",
|
||||
".log",
|
||||
}
|
||||
|
||||
try:
|
||||
# List files recursively using find command
|
||||
# Exclude node_modules and .git directories, but allow hidden files
|
||||
# like .env and .gitignore (they're filtered by text_extensions later)
|
||||
# Filter by timestamp to only get files created/modified during this run
|
||||
safe_working_dir = shlex.quote(working_directory)
|
||||
timestamp_filter = ""
|
||||
if since_timestamp:
|
||||
timestamp_filter = f"-newermt {shlex.quote(since_timestamp)} "
|
||||
find_result = await sandbox.commands.run(
|
||||
f"find {safe_working_dir} -type f "
|
||||
f"{timestamp_filter}"
|
||||
f"-not -path '*/node_modules/*' "
|
||||
f"-not -path '*/.git/*' "
|
||||
f"2>/dev/null"
|
||||
)
|
||||
|
||||
if find_result.stdout:
|
||||
for file_path in find_result.stdout.strip().split("\n"):
|
||||
if not file_path:
|
||||
continue
|
||||
|
||||
# Check if it's a text file we can read
|
||||
is_text = any(
|
||||
file_path.endswith(ext) for ext in text_extensions
|
||||
) or file_path.endswith("Dockerfile")
|
||||
|
||||
if is_text:
|
||||
try:
|
||||
content = await sandbox.files.read(file_path)
|
||||
# Handle bytes or string
|
||||
if isinstance(content, bytes):
|
||||
content = content.decode("utf-8", errors="replace")
|
||||
|
||||
# Extract filename from path
|
||||
file_name = file_path.split("/")[-1]
|
||||
|
||||
# Calculate relative path by stripping working directory
|
||||
relative_path = file_path
|
||||
if file_path.startswith(working_directory):
|
||||
relative_path = file_path[len(working_directory) :]
|
||||
# Remove leading slash if present
|
||||
if relative_path.startswith("/"):
|
||||
relative_path = relative_path[1:]
|
||||
|
||||
files.append(
|
||||
ClaudeCodeBlock.FileOutput(
|
||||
path=file_path,
|
||||
relative_path=relative_path,
|
||||
name=file_name,
|
||||
content=content,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
# Skip files that can't be read
|
||||
pass
|
||||
|
||||
except Exception:
|
||||
# If file extraction fails, return empty results
|
||||
pass
|
||||
|
||||
return files
|
||||
|
||||
def _escape_prompt(self, prompt: str) -> str:
|
||||
"""Escape the prompt for safe shell execution."""
|
||||
# Use single quotes and escape any single quotes in the prompt
|
||||
@@ -490,7 +617,6 @@ class ClaudeCodeBlock(Block):
|
||||
*,
|
||||
e2b_credentials: APIKeyCredentials,
|
||||
anthropic_credentials: APIKeyCredentials,
|
||||
execution_context: "ExecutionContext",
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
@@ -511,7 +637,6 @@ class ClaudeCodeBlock(Block):
|
||||
existing_sandbox_id=input_data.sandbox_id,
|
||||
conversation_history=input_data.conversation_history,
|
||||
dispose_sandbox=input_data.dispose_sandbox,
|
||||
execution_context=execution_context,
|
||||
)
|
||||
|
||||
yield "response", response
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Literal, Optional
|
||||
from typing import Any, Literal, Optional
|
||||
|
||||
from e2b_code_interpreter import AsyncSandbox
|
||||
from e2b_code_interpreter import Result as E2BExecutionResult
|
||||
from e2b_code_interpreter.charts import Chart as E2BExecutionResultChart
|
||||
from pydantic import BaseModel, Field, JsonValue, SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
@@ -20,13 +20,6 @@ from backend.data.model import (
|
||||
SchemaField,
|
||||
)
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util.sandbox_files import (
|
||||
SandboxFileOutput,
|
||||
extract_and_store_sandbox_files,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.executor.utils import ExecutionContext
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
@@ -92,9 +85,6 @@ class CodeExecutionResult(MainCodeExecutionResult):
|
||||
class BaseE2BExecutorMixin:
|
||||
"""Shared implementation methods for E2B executor blocks."""
|
||||
|
||||
# Default working directory in E2B sandboxes
|
||||
WORKING_DIR = "/home/user"
|
||||
|
||||
async def execute_code(
|
||||
self,
|
||||
api_key: str,
|
||||
@@ -105,21 +95,14 @@ class BaseE2BExecutorMixin:
|
||||
timeout: Optional[int] = None,
|
||||
sandbox_id: Optional[str] = None,
|
||||
dispose_sandbox: bool = False,
|
||||
execution_context: Optional["ExecutionContext"] = None,
|
||||
extract_files: bool = False,
|
||||
):
|
||||
"""
|
||||
Unified code execution method that handles all three use cases:
|
||||
1. Create new sandbox and execute (ExecuteCodeBlock)
|
||||
2. Create new sandbox, execute, and return sandbox_id (InstantiateCodeSandboxBlock)
|
||||
3. Connect to existing sandbox and execute (ExecuteCodeStepBlock)
|
||||
|
||||
Args:
|
||||
extract_files: If True and execution_context provided, extract files
|
||||
created/modified during execution and store to workspace.
|
||||
""" # noqa
|
||||
sandbox = None
|
||||
files: list[SandboxFileOutput] = []
|
||||
try:
|
||||
if sandbox_id:
|
||||
# Connect to existing sandbox (ExecuteCodeStepBlock case)
|
||||
@@ -135,12 +118,6 @@ class BaseE2BExecutorMixin:
|
||||
for cmd in setup_commands:
|
||||
await sandbox.commands.run(cmd)
|
||||
|
||||
# Capture timestamp before execution to scope file extraction
|
||||
start_timestamp = None
|
||||
if extract_files:
|
||||
ts_result = await sandbox.commands.run("date -u +%Y-%m-%dT%H:%M:%S")
|
||||
start_timestamp = ts_result.stdout.strip() if ts_result.stdout else None
|
||||
|
||||
# Execute the code
|
||||
execution = await sandbox.run_code(
|
||||
code,
|
||||
@@ -156,24 +133,7 @@ class BaseE2BExecutorMixin:
|
||||
stdout_logs = "".join(execution.logs.stdout)
|
||||
stderr_logs = "".join(execution.logs.stderr)
|
||||
|
||||
# Extract files created/modified during this execution
|
||||
if extract_files and execution_context:
|
||||
files = await extract_and_store_sandbox_files(
|
||||
sandbox=sandbox,
|
||||
working_directory=self.WORKING_DIR,
|
||||
execution_context=execution_context,
|
||||
since_timestamp=start_timestamp,
|
||||
text_only=False, # Include binary files too
|
||||
)
|
||||
|
||||
return (
|
||||
results,
|
||||
text_output,
|
||||
stdout_logs,
|
||||
stderr_logs,
|
||||
sandbox.sandbox_id,
|
||||
files,
|
||||
)
|
||||
return results, text_output, stdout_logs, stderr_logs, sandbox.sandbox_id
|
||||
finally:
|
||||
# Dispose of sandbox if requested to reduce usage costs
|
||||
if dispose_sandbox and sandbox:
|
||||
@@ -278,12 +238,6 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
|
||||
description="Standard output logs from execution"
|
||||
)
|
||||
stderr_logs: str = SchemaField(description="Standard error logs from execution")
|
||||
files: list[SandboxFileOutput] = SchemaField(
|
||||
description=(
|
||||
"Files created or modified during execution. "
|
||||
"Each file has path, name, content, and workspace_ref (if stored)."
|
||||
),
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
@@ -305,30 +259,23 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
|
||||
("results", []),
|
||||
("response", "Hello World"),
|
||||
("stdout_logs", "Hello World\n"),
|
||||
("files", []),
|
||||
],
|
||||
test_mock={
|
||||
"execute_code": lambda api_key, code, language, template_id, setup_commands, timeout, dispose_sandbox, execution_context, extract_files: ( # noqa
|
||||
"execute_code": lambda api_key, code, language, template_id, setup_commands, timeout, dispose_sandbox: ( # noqa
|
||||
[], # results
|
||||
"Hello World", # text_output
|
||||
"Hello World\n", # stdout_logs
|
||||
"", # stderr_logs
|
||||
"sandbox_id", # sandbox_id
|
||||
[], # files
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: APIKeyCredentials,
|
||||
execution_context: "ExecutionContext",
|
||||
**kwargs,
|
||||
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
results, text_output, stdout, stderr, _, files = await self.execute_code(
|
||||
results, text_output, stdout, stderr, _ = await self.execute_code(
|
||||
api_key=credentials.api_key.get_secret_value(),
|
||||
code=input_data.code,
|
||||
language=input_data.language,
|
||||
@@ -336,8 +283,6 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
|
||||
setup_commands=input_data.setup_commands,
|
||||
timeout=input_data.timeout,
|
||||
dispose_sandbox=input_data.dispose_sandbox,
|
||||
execution_context=execution_context,
|
||||
extract_files=True,
|
||||
)
|
||||
|
||||
# Determine result object shape & filter out empty formats
|
||||
@@ -351,8 +296,6 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
|
||||
yield "stdout_logs", stdout
|
||||
if stderr:
|
||||
yield "stderr_logs", stderr
|
||||
# Always yield files (empty list if none)
|
||||
yield "files", [f.model_dump() for f in files]
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
@@ -450,7 +393,6 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
|
||||
"Hello World\n", # stdout_logs
|
||||
"", # stderr_logs
|
||||
"sandbox_id", # sandbox_id
|
||||
[], # files
|
||||
),
|
||||
},
|
||||
)
|
||||
@@ -459,7 +401,7 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
|
||||
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
_, text_output, stdout, stderr, sandbox_id, _ = await self.execute_code(
|
||||
_, text_output, stdout, stderr, sandbox_id = await self.execute_code(
|
||||
api_key=credentials.api_key.get_secret_value(),
|
||||
code=input_data.setup_code,
|
||||
language=input_data.language,
|
||||
@@ -558,7 +500,6 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
|
||||
"Hello World\n", # stdout_logs
|
||||
"", # stderr_logs
|
||||
sandbox_id, # sandbox_id
|
||||
[], # files
|
||||
),
|
||||
},
|
||||
)
|
||||
@@ -567,7 +508,7 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
|
||||
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
results, text_output, stdout, stderr, _, _ = await self.execute_code(
|
||||
results, text_output, stdout, stderr, _ = await self.execute_code(
|
||||
api_key=credentials.api_key.get_secret_value(),
|
||||
code=input_data.step_code,
|
||||
language=input_data.language,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import re
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -6,7 +6,7 @@ from openai import AsyncOpenAI
|
||||
from openai.types.responses import Response as OpenAIResponse
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockManualWebhookConfig,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Any, List
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import codecs
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Any, Literal, cast
|
||||
import discord
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
Discord OAuth-based blocks.
|
||||
"""
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import codecs
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -8,7 +8,7 @@ which provides access to LinkedIn profile data and related information.
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -3,13 +3,6 @@ import logging
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from backend.blocks._base import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.fal._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -17,6 +10,13 @@ from backend.blocks.fal._auth import (
|
||||
FalCredentialsField,
|
||||
FalCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.file import store_media_file
|
||||
|
||||
@@ -5,7 +5,7 @@ from pydantic import SecretStr
|
||||
from replicate.client import Client as ReplicateClient
|
||||
from replicate.helpers import FileOutput
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -5,7 +5,7 @@ from typing import Optional
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -3,7 +3,7 @@ from urllib.parse import urlparse
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,7 @@ import re
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,7 @@ import base64
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -4,7 +4,7 @@ from typing import Any, List, Optional
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -4,7 +4,7 @@ from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -8,7 +8,7 @@ from google.oauth2.credentials import Credentials
|
||||
from googleapiclient.discovery import build
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -7,14 +7,14 @@ from google.oauth2.credentials import Credentials
|
||||
from googleapiclient.discovery import build
|
||||
from gravitas_md2gdocs import to_requests
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.settings import Settings
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from google.oauth2.credentials import Credentials
|
||||
from googleapiclient.discovery import build
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -7,14 +7,14 @@ from enum import Enum
|
||||
from google.oauth2.credentials import Credentials
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.settings import Settings
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Literal
|
||||
import googlemaps
|
||||
from pydantic import BaseModel, SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -9,7 +9,9 @@ from typing import Any, Optional
|
||||
from prisma.enums import ReviewStatus
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.data.human_review import ReviewResult
|
||||
from backend.executor.manager import async_update_node_execution_status
|
||||
from backend.util.clients import get_database_manager_async_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -41,8 +43,6 @@ class HITLReviewHelper:
|
||||
@staticmethod
|
||||
async def update_node_execution_status(**kwargs) -> None:
|
||||
"""Update the execution status of a node."""
|
||||
from backend.executor.manager import async_update_node_execution_status
|
||||
|
||||
await async_update_node_execution_status(
|
||||
db_client=get_database_manager_async_client(), **kwargs
|
||||
)
|
||||
@@ -88,13 +88,12 @@ class HITLReviewHelper:
|
||||
Raises:
|
||||
Exception: If review creation or status update fails
|
||||
"""
|
||||
from backend.data.execution import ExecutionStatus
|
||||
|
||||
# Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode)
|
||||
# are handled by the caller:
|
||||
# - HITL blocks check human_in_the_loop_safe_mode in their run() method
|
||||
# - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review()
|
||||
# This function only handles checking for existing approvals.
|
||||
|
||||
# Check if this node has already been approved (normal or auto-approval)
|
||||
if approval_result := await HITLReviewHelper.check_approval(
|
||||
node_exec_id=node_exec_id,
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Literal
|
||||
import aiofiles
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.hubspot._auth import (
|
||||
HubSpotCredentials,
|
||||
HubSpotCredentialsField,
|
||||
HubSpotCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.hubspot._auth import (
|
||||
HubSpotCredentials,
|
||||
HubSpotCredentialsField,
|
||||
HubSpotCredentialsInput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.hubspot._auth import (
|
||||
HubSpotCredentials,
|
||||
HubSpotCredentialsField,
|
||||
HubSpotCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.hubspot._auth import (
|
||||
HubSpotCredentials,
|
||||
HubSpotCredentialsField,
|
||||
HubSpotCredentialsInput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.hubspot._auth import (
|
||||
HubSpotCredentials,
|
||||
HubSpotCredentialsField,
|
||||
HubSpotCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.hubspot._auth import (
|
||||
HubSpotCredentials,
|
||||
HubSpotCredentialsField,
|
||||
HubSpotCredentialsInput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
@@ -3,7 +3,8 @@ from typing import Any
|
||||
|
||||
from prisma.enums import ReviewStatus
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.helpers.review import HITLReviewHelper
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
@@ -11,7 +12,6 @@ from backend.blocks._base import (
|
||||
BlockSchemaOutput,
|
||||
BlockType,
|
||||
)
|
||||
from backend.blocks.helpers.review import HITLReviewHelper
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.human_review import ReviewResult
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Any, Dict, Literal, Optional
|
||||
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,9 @@ import copy
|
||||
from datetime import date, time
|
||||
from typing import Any, Optional
|
||||
|
||||
from backend.blocks._base import (
|
||||
# Import for Google Drive file input block
|
||||
from backend.blocks.google._drive import AttachmentView, GoogleDriveFile
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
@@ -10,9 +12,6 @@ from backend.blocks._base import (
|
||||
BlockSchemaInput,
|
||||
BlockType,
|
||||
)
|
||||
|
||||
# Import for Google Drive file input block
|
||||
from backend.blocks.google._drive import AttachmentView, GoogleDriveFile
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.file import store_media_file
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Any
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
@@ -3,18 +3,18 @@ from urllib.parse import quote
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
@@ -1,12 +1,5 @@
|
||||
from urllib.parse import quote
|
||||
|
||||
from backend.blocks._base import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.blocks.jina._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -15,6 +8,13 @@ from backend.blocks.jina._auth import (
|
||||
JinaCredentialsInput,
|
||||
)
|
||||
from backend.blocks.search import GetRequest
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.exceptions import BlockExecutionError
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ from anthropic.types import ToolParam
|
||||
from groq import AsyncGroq
|
||||
from pydantic import BaseModel, SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -2,7 +2,7 @@ import operator
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import List, Literal
|
||||
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks._base import (
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Any, Literal, Optional, Union
|
||||
from mem0 import MemoryClient
|
||||
from pydantic import BaseModel, SecretStr
|
||||
|
||||
from backend.blocks._base import Block, BlockOutput, BlockSchemaInput, BlockSchemaOutput
|
||||
from backend.data.block import Block, BlockOutput, BlockSchemaInput, BlockSchemaOutput
|
||||
from backend.data.model import (
|
||||
APIKeyCredentials,
|
||||
CredentialsField,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user