Compare commits

..

13 Commits

Author SHA1 Message Date
Zamil Majdy
253c4bbc63 fix(copilot): prevent duplicate error markers and extract shared helper
- Extract `_append_error_marker()` helper to deduplicate marker
  appending logic across 4 call sites
- Skip appending error marker in BaseException handler when one was
  already appended inside the stream loop (ended_with_stream_error)
- Update misleading "mark as retryable" comment to match actual
  behavior (uses retryable prefix, not a model field)
- Add docstring to `_safe()` helper
- Remove unused `prefix` variable from stream error tuple
2026-03-17 13:51:19 +07:00
Zamil Majdy
c0a91be65e fix(copilot): prevent duplicate error markers and extract shared helper
- Change stale error marker cleanup from `if` to `while` so ALL
  trailing markers are removed (fixes issue where mid-stream error
  + cleanup error could leave a stale marker)
- Skip appending error marker in BaseException handler when one was
  already appended inside the stream loop (ended_with_stream_error)
- Extract `_append_error_marker()` helper to deduplicate marker
  appending logic across 4 call sites
- Update misleading "mark as retryable" comment to match actual
  behavior (uses retryable prefix, not a model field)
- Add docstring to `_safe()` helper
- Remove unused `prefix` variable from stream error tuple
2026-03-17 13:50:30 +07:00
Zamil Majdy
64d82797b5 refactor: use COPILOT_RETRYABLE_ERROR_PREFIX for server-driven retry
Replace frontend string matching on error text with a dedicated marker
prefix. The backend now uses COPILOT_RETRYABLE_ERROR_PREFIX for
transient errors, and the frontend checks markerType instead of
matching "Anthropic connection interrupted". Also collapses remaining
scattered ternaries and the base URL validation guard.
2026-03-17 13:44:50 +07:00
Zamil Majdy
1565564bce fix: persist ResultMessage errors to session and simplify adapter
- Append COPILOT_ERROR_PREFIX marker when convert_message produces a
  StreamError, so the error card survives page refresh.
- Collapse duplicate ternaries into a single if/else block.
2026-03-17 13:36:21 +07:00
Zamil Majdy
0614b22a72 refactor: simplify stream error handler branching
Set log level, display message, and error code upfront based on
is_transient, then use them once — removes the if/else duplication.
2026-03-17 13:32:29 +07:00
Zamil Majdy
feeed4645c refactor: collapse duplicate return {} guards in _build_sdk_env 2026-03-17 13:30:01 +07:00
Zamil Majdy
ccd69df357 fix: remove dead retryable field from StreamError
The AI SDK uses z.strictObject({type, errorText}) which rejects extra
fields in SSE data — so the retryable field could never reach the
frontend. The frontend correctly uses string matching on the friendly
error message instead.
2026-03-17 13:23:51 +07:00
Zamil Majdy
1d5598df3d fix(copilot): persist transient error markers and case-insensitive detection
- Append COPILOT_ERROR_PREFIX marker to session before yielding
  StreamError so the error card survives page refresh.
- Make frontend transient error detection case-insensitive.
2026-03-17 13:14:45 +07:00
Zamil Majdy
84f3ca9a62 refactor(copilot): simplify _build_sdk_env with early returns per mode
Replace nested if/elif/else with three self-contained early-return
blocks (subscription → direct → openrouter). Removes shared mutable
dict and scattered header injection logic.
2026-03-17 13:11:28 +07:00
Zamil Majdy
94af0b264c feat(copilot): add use_openrouter flag to bypass OpenRouter proxy
Adds CHAT_USE_OPENROUTER config flag (default: true) that controls
whether the SDK routes API calls through OpenRouter or connects to
Anthropic directly. When false, the subprocess inherits ANTHROPIC_API_KEY
from the environment and skips the proxy hop, reducing connection
drop surface area.
2026-03-17 13:09:36 +07:00
Zamil Majdy
a31fc008e8 fix(copilot): check error field (not just content) for transient API errors
The transient error detection was checking str(sdk_msg.content) which
contains content blocks, not the actual error string from sdk_msg.error.
Now checks both the error field and content preview for transient patterns.
2026-03-17 12:45:12 +07:00
Zamil Majdy
2e8b984f8e fix(copilot): handle transient Anthropic API connection errors gracefully
Detect transient Anthropic API errors (ECONNRESET, socket closed) and
replace raw technical error messages with a user-friendly
"Anthropic connection interrupted — please retry" message. Add a
`retryable` flag to StreamError so the frontend can show a "Try Again"
button that re-sends the last user message.

Fixes SECRT-2128, SECRT-2129, SECRT-2130
2026-03-17 12:35:33 +07:00
Reinier van der Leer
aff3fb44af ci(platform): Improve end-to-end CI & reduce its cost (#12437)
Our CI costs are skyrocketing, most of it because of
`platform-fullstack-ci.yml`. The `types` job currently uses in a
`big-boi` runner (= expensive), but doesn't need to.
Additionally, the "end-to-end tests" job is currently in
`platform-frontend-ci.yml` instead of `platform-fullstack-ci.yml`,
causing it not to run on backend changes (which it should).

### Changes 🏗️

- Simplify `check-api-types` job (renamed from `types`) and make it use
regular `ubuntu-latest` runner
- Export API schema from backend through CLI (instead of spinning it up
in docker)
- Fix dependency caching in `platform-fullstack-ci.yml` (based on recent
improvements in `platform-frontend-ci.yml`)
- Move `e2e_tests` job to `platform-fullstack-ci.yml`

Out-of-scope but necessary:
- Eliminate module-level init of OpenAI client in
`backend.copilot.service`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI
2026-03-16 23:08:18 +00:00
26 changed files with 991 additions and 3693 deletions

View File

@@ -5,12 +5,14 @@ on:
branches: [master, dev, ci-test*]
paths:
- ".github/workflows/platform-backend-ci.yml"
- ".github/workflows/scripts/get_package_version_from_lockfile.py"
- "autogpt_platform/backend/**"
- "autogpt_platform/autogpt_libs/**"
pull_request:
branches: [master, dev, release-*]
paths:
- ".github/workflows/platform-backend-ci.yml"
- ".github/workflows/scripts/get_package_version_from_lockfile.py"
- "autogpt_platform/backend/**"
- "autogpt_platform/autogpt_libs/**"
merge_group:

View File

@@ -120,175 +120,6 @@ jobs:
token: ${{ secrets.GITHUB_TOKEN }}
exitOnceUploaded: true
e2e_test:
name: end-to-end tests
runs-on: big-boi
steps:
- name: Checkout repository
uses: actions/checkout@v6
with:
submodules: recursive
- name: Set up Platform - Copy default supabase .env
run: |
cp ../.env.default ../.env
- name: Set up Platform - Copy backend .env and set OpenAI API key
run: |
cp ../backend/.env.default ../backend/.env
echo "OPENAI_INTERNAL_API_KEY=${{ secrets.OPENAI_API_KEY }}" >> ../backend/.env
env:
# Used by E2E test data script to generate embeddings for approved store agents
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Platform - Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
driver-opts: network=host
- name: Set up Platform - Expose GHA cache to docker buildx CLI
uses: crazy-max/ghaction-github-runtime@v4
- name: Set up Platform - Build Docker images (with cache)
working-directory: autogpt_platform
run: |
pip install pyyaml
# Resolve extends and generate a flat compose file that bake can understand
docker compose -f docker-compose.yml config > docker-compose.resolved.yml
# Add cache configuration to the resolved compose file
python ../.github/workflows/scripts/docker-ci-fix-compose-build-cache.py \
--source docker-compose.resolved.yml \
--cache-from "type=gha" \
--cache-to "type=gha,mode=max" \
--backend-hash "${{ hashFiles('autogpt_platform/backend/Dockerfile', 'autogpt_platform/backend/poetry.lock', 'autogpt_platform/backend/backend') }}" \
--frontend-hash "${{ hashFiles('autogpt_platform/frontend/Dockerfile', 'autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/src') }}" \
--git-ref "${{ github.ref }}"
# Build with bake using the resolved compose file (now includes cache config)
docker buildx bake --allow=fs.read=.. -f docker-compose.resolved.yml --load
env:
NEXT_PUBLIC_PW_TEST: true
- name: Set up tests - Cache E2E test data
id: e2e-data-cache
uses: actions/cache@v5
with:
path: /tmp/e2e_test_data.sql
key: e2e-test-data-${{ hashFiles('autogpt_platform/backend/test/e2e_test_data.py', 'autogpt_platform/backend/migrations/**', '.github/workflows/platform-frontend-ci.yml') }}
- name: Set up Platform - Start Supabase DB + Auth
run: |
docker compose -f ../docker-compose.resolved.yml up -d db auth --no-build
echo "Waiting for database to be ready..."
timeout 60 sh -c 'until docker compose -f ../docker-compose.resolved.yml exec -T db pg_isready -U postgres 2>/dev/null; do sleep 2; done'
echo "Waiting for auth service to be ready..."
timeout 60 sh -c 'until docker compose -f ../docker-compose.resolved.yml exec -T db psql -U postgres -d postgres -c "SELECT 1 FROM auth.users LIMIT 1" 2>/dev/null; do sleep 2; done' || echo "Auth schema check timeout, continuing..."
- name: Set up Platform - Run migrations
run: |
echo "Running migrations..."
docker compose -f ../docker-compose.resolved.yml run --rm migrate
echo "✅ Migrations completed"
env:
NEXT_PUBLIC_PW_TEST: true
- name: Set up tests - Load cached E2E test data
if: steps.e2e-data-cache.outputs.cache-hit == 'true'
run: |
echo "✅ Found cached E2E test data, restoring..."
{
echo "SET session_replication_role = 'replica';"
cat /tmp/e2e_test_data.sql
echo "SET session_replication_role = 'origin';"
} | docker compose -f ../docker-compose.resolved.yml exec -T db psql -U postgres -d postgres -b
# Refresh materialized views after restore
docker compose -f ../docker-compose.resolved.yml exec -T db \
psql -U postgres -d postgres -b -c "SET search_path TO platform; SELECT refresh_store_materialized_views();" || true
echo "✅ E2E test data restored from cache"
- name: Set up Platform - Start (all other services)
run: |
docker compose -f ../docker-compose.resolved.yml up -d --no-build
echo "Waiting for rest_server to be ready..."
timeout 60 sh -c 'until curl -f http://localhost:8006/health 2>/dev/null; do sleep 2; done' || echo "Rest server health check timeout, continuing..."
env:
NEXT_PUBLIC_PW_TEST: true
- name: Set up tests - Create E2E test data
if: steps.e2e-data-cache.outputs.cache-hit != 'true'
run: |
echo "Creating E2E test data..."
docker cp ../backend/test/e2e_test_data.py $(docker compose -f ../docker-compose.resolved.yml ps -q rest_server):/tmp/e2e_test_data.py
docker compose -f ../docker-compose.resolved.yml exec -T rest_server sh -c "cd /app/autogpt_platform && python /tmp/e2e_test_data.py" || {
echo "❌ E2E test data creation failed!"
docker compose -f ../docker-compose.resolved.yml logs --tail=50 rest_server
exit 1
}
# Dump auth.users + platform schema for cache (two separate dumps)
echo "Dumping database for cache..."
{
docker compose -f ../docker-compose.resolved.yml exec -T db \
pg_dump -U postgres --data-only --column-inserts \
--table='auth.users' postgres
docker compose -f ../docker-compose.resolved.yml exec -T db \
pg_dump -U postgres --data-only --column-inserts \
--schema=platform \
--exclude-table='platform._prisma_migrations' \
--exclude-table='platform.apscheduler_jobs' \
--exclude-table='platform.apscheduler_jobs_batched_notifications' \
postgres
} > /tmp/e2e_test_data.sql
echo "✅ Database dump created for caching ($(wc -l < /tmp/e2e_test_data.sql) lines)"
- name: Set up tests - Enable corepack
run: corepack enable
- name: Set up tests - Set up Node
uses: actions/setup-node@v6
with:
node-version: "22.18.0"
cache: "pnpm"
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
- name: Set up tests - Install dependencies
run: pnpm install --frozen-lockfile
- name: Set up tests - Install browser 'chromium'
run: pnpm playwright install --with-deps chromium
- name: Run Playwright tests
run: pnpm test:no-build
continue-on-error: false
- name: Upload Playwright report
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-report
path: playwright-report
if-no-files-found: ignore
retention-days: 3
- name: Upload Playwright test results
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-test-results
path: test-results
if-no-files-found: ignore
retention-days: 3
- name: Print Final Docker Compose logs
if: always()
run: docker compose -f ../docker-compose.resolved.yml logs
integration_test:
runs-on: ubuntu-latest
needs: setup

View File

@@ -1,14 +1,18 @@
name: AutoGPT Platform - Frontend CI
name: AutoGPT Platform - Full-stack CI
on:
push:
branches: [master, dev]
paths:
- ".github/workflows/platform-fullstack-ci.yml"
- ".github/workflows/scripts/docker-ci-fix-compose-build-cache.py"
- ".github/workflows/scripts/get_package_version_from_lockfile.py"
- "autogpt_platform/**"
pull_request:
paths:
- ".github/workflows/platform-fullstack-ci.yml"
- ".github/workflows/scripts/docker-ci-fix-compose-build-cache.py"
- ".github/workflows/scripts/get_package_version_from_lockfile.py"
- "autogpt_platform/**"
merge_group:
@@ -24,42 +28,28 @@ defaults:
jobs:
setup:
runs-on: ubuntu-latest
outputs:
cache-key: ${{ steps.cache-key.outputs.key }}
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Set up Node.js
uses: actions/setup-node@v6
with:
node-version: "22.18.0"
- name: Enable corepack
run: corepack enable
- name: Generate cache key
id: cache-key
run: echo "key=${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}" >> $GITHUB_OUTPUT
- name: Cache dependencies
uses: actions/cache@v5
- name: Set up Node
uses: actions/setup-node@v6
with:
path: ~/.pnpm-store
key: ${{ steps.cache-key.outputs.key }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
node-version: "22.18.0"
cache: "pnpm"
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
- name: Install dependencies
- name: Install dependencies to populate cache
run: pnpm install --frozen-lockfile
types:
runs-on: big-boi
check-api-types:
name: check API types
runs-on: ubuntu-latest
needs: setup
strategy:
fail-fast: false
steps:
- name: Checkout repository
@@ -67,70 +57,256 @@ jobs:
with:
submodules: recursive
- name: Set up Node.js
# ------------------------ Backend setup ------------------------
- name: Set up Backend - Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Set up Backend - Install Poetry
working-directory: autogpt_platform/backend
run: |
POETRY_VERSION=$(python ../../.github/workflows/scripts/get_package_version_from_lockfile.py poetry)
echo "Installing Poetry version ${POETRY_VERSION}"
curl -sSL https://install.python-poetry.org | POETRY_VERSION=$POETRY_VERSION python3 -
- name: Set up Backend - Set up dependency cache
uses: actions/cache@v5
with:
path: ~/.cache/pypoetry
key: poetry-${{ runner.os }}-${{ hashFiles('autogpt_platform/backend/poetry.lock') }}
- name: Set up Backend - Install dependencies
working-directory: autogpt_platform/backend
run: poetry install
- name: Set up Backend - Generate Prisma client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
- name: Set up Frontend - Export OpenAPI schema from Backend
working-directory: autogpt_platform/backend
run: poetry run export-api-schema --output ../frontend/src/app/api/openapi.json
# ------------------------ Frontend setup ------------------------
- name: Set up Frontend - Enable corepack
run: corepack enable
- name: Set up Frontend - Set up Node
uses: actions/setup-node@v6
with:
node-version: "22.18.0"
cache: "pnpm"
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
- name: Enable corepack
run: corepack enable
- name: Copy default supabase .env
run: |
cp ../.env.default ../.env
- name: Copy backend .env
run: |
cp ../backend/.env.default ../backend/.env
- name: Run docker compose
run: |
docker compose -f ../docker-compose.yml --profile local up -d deps_backend
- name: Restore dependencies cache
uses: actions/cache@v5
with:
path: ~/.pnpm-store
key: ${{ needs.setup.outputs.cache-key }}
restore-keys: |
${{ runner.os }}-pnpm-
- name: Install dependencies
- name: Set up Frontend - Install dependencies
run: pnpm install --frozen-lockfile
- name: Setup .env
run: cp .env.default .env
- name: Wait for services to be ready
run: |
echo "Waiting for rest_server to be ready..."
timeout 60 sh -c 'until curl -f http://localhost:8006/health 2>/dev/null; do sleep 2; done' || echo "Rest server health check timeout, continuing..."
echo "Waiting for database to be ready..."
timeout 60 sh -c 'until docker compose -f ../docker-compose.yml exec -T db pg_isready -U postgres 2>/dev/null; do sleep 2; done' || echo "Database ready check timeout, continuing..."
- name: Generate API queries
run: pnpm generate:api:force
- name: Set up Frontend - Format OpenAPI schema
id: format-schema
run: pnpm prettier --write ./src/app/api/openapi.json
- name: Check for API schema changes
run: |
if ! git diff --exit-code src/app/api/openapi.json; then
echo "❌ API schema changes detected in src/app/api/openapi.json"
echo ""
echo "The openapi.json file has been modified after running 'pnpm generate:api-all'."
echo "The openapi.json file has been modified after exporting the API schema."
echo "This usually means changes have been made in the BE endpoints without updating the Frontend."
echo "The API schema is now out of sync with the Front-end queries."
echo ""
echo "To fix this:"
echo "1. Pull the backend 'docker compose pull && docker compose up -d --build --force-recreate'"
echo "2. Run 'pnpm generate:api' locally"
echo "3. Run 'pnpm types' locally"
echo "4. Fix any TypeScript errors that may have been introduced"
echo "5. Commit and push your changes"
echo "\nIn the backend directory:"
echo "1. Run 'poetry run export-api-schema --output ../frontend/src/app/api/openapi.json'"
echo "\nIn the frontend directory:"
echo "2. Run 'pnpm prettier --write src/app/api/openapi.json'"
echo "3. Run 'pnpm generate:api'"
echo "4. Run 'pnpm types'"
echo "5. Fix any TypeScript errors that may have been introduced"
echo "6. Commit and push your changes"
echo ""
exit 1
else
echo "✅ No API schema changes detected"
fi
- name: Run Typescript checks
- name: Set up Frontend - Generate API client
id: generate-api-client
run: pnpm orval --config ./orval.config.ts
# Continue with type generation & check even if there are schema changes
if: success() || (steps.format-schema.outcome == 'success')
- name: Check for TypeScript errors
run: pnpm types
if: success() || (steps.generate-api-client.outcome == 'success')
e2e_test:
name: end-to-end tests
runs-on: big-boi
steps:
- name: Checkout repository
uses: actions/checkout@v6
with:
submodules: recursive
- name: Set up Platform - Copy default supabase .env
run: |
cp ../.env.default ../.env
- name: Set up Platform - Copy backend .env and set OpenAI API key
run: |
cp ../backend/.env.default ../backend/.env
echo "OPENAI_INTERNAL_API_KEY=${{ secrets.OPENAI_API_KEY }}" >> ../backend/.env
env:
# Used by E2E test data script to generate embeddings for approved store agents
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Platform - Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
driver-opts: network=host
- name: Set up Platform - Expose GHA cache to docker buildx CLI
uses: crazy-max/ghaction-github-runtime@v4
- name: Set up Platform - Build Docker images (with cache)
working-directory: autogpt_platform
run: |
pip install pyyaml
# Resolve extends and generate a flat compose file that bake can understand
docker compose -f docker-compose.yml config > docker-compose.resolved.yml
# Add cache configuration to the resolved compose file
python ../.github/workflows/scripts/docker-ci-fix-compose-build-cache.py \
--source docker-compose.resolved.yml \
--cache-from "type=gha" \
--cache-to "type=gha,mode=max" \
--backend-hash "${{ hashFiles('autogpt_platform/backend/Dockerfile', 'autogpt_platform/backend/poetry.lock', 'autogpt_platform/backend/backend/**') }}" \
--frontend-hash "${{ hashFiles('autogpt_platform/frontend/Dockerfile', 'autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/src/**') }}" \
--git-ref "${{ github.ref }}"
# Build with bake using the resolved compose file (now includes cache config)
docker buildx bake --allow=fs.read=.. -f docker-compose.resolved.yml --load
env:
NEXT_PUBLIC_PW_TEST: true
- name: Set up tests - Cache E2E test data
id: e2e-data-cache
uses: actions/cache@v5
with:
path: /tmp/e2e_test_data.sql
key: e2e-test-data-${{ hashFiles('autogpt_platform/backend/test/e2e_test_data.py', 'autogpt_platform/backend/migrations/**', '.github/workflows/platform-fullstack-ci.yml') }}
- name: Set up Platform - Start Supabase DB + Auth
run: |
docker compose -f ../docker-compose.resolved.yml up -d db auth --no-build
echo "Waiting for database to be ready..."
timeout 60 sh -c 'until docker compose -f ../docker-compose.resolved.yml exec -T db pg_isready -U postgres 2>/dev/null; do sleep 2; done'
echo "Waiting for auth service to be ready..."
timeout 60 sh -c 'until docker compose -f ../docker-compose.resolved.yml exec -T db psql -U postgres -d postgres -c "SELECT 1 FROM auth.users LIMIT 1" 2>/dev/null; do sleep 2; done' || echo "Auth schema check timeout, continuing..."
- name: Set up Platform - Run migrations
run: |
echo "Running migrations..."
docker compose -f ../docker-compose.resolved.yml run --rm migrate
echo "✅ Migrations completed"
env:
NEXT_PUBLIC_PW_TEST: true
- name: Set up tests - Load cached E2E test data
if: steps.e2e-data-cache.outputs.cache-hit == 'true'
run: |
echo "✅ Found cached E2E test data, restoring..."
{
echo "SET session_replication_role = 'replica';"
cat /tmp/e2e_test_data.sql
echo "SET session_replication_role = 'origin';"
} | docker compose -f ../docker-compose.resolved.yml exec -T db psql -U postgres -d postgres -b
# Refresh materialized views after restore
docker compose -f ../docker-compose.resolved.yml exec -T db \
psql -U postgres -d postgres -b -c "SET search_path TO platform; SELECT refresh_store_materialized_views();" || true
echo "✅ E2E test data restored from cache"
- name: Set up Platform - Start (all other services)
run: |
docker compose -f ../docker-compose.resolved.yml up -d --no-build
echo "Waiting for rest_server to be ready..."
timeout 60 sh -c 'until curl -f http://localhost:8006/health 2>/dev/null; do sleep 2; done' || echo "Rest server health check timeout, continuing..."
env:
NEXT_PUBLIC_PW_TEST: true
- name: Set up tests - Create E2E test data
if: steps.e2e-data-cache.outputs.cache-hit != 'true'
run: |
echo "Creating E2E test data..."
docker cp ../backend/test/e2e_test_data.py $(docker compose -f ../docker-compose.resolved.yml ps -q rest_server):/tmp/e2e_test_data.py
docker compose -f ../docker-compose.resolved.yml exec -T rest_server sh -c "cd /app/autogpt_platform && python /tmp/e2e_test_data.py" || {
echo "❌ E2E test data creation failed!"
docker compose -f ../docker-compose.resolved.yml logs --tail=50 rest_server
exit 1
}
# Dump auth.users + platform schema for cache (two separate dumps)
echo "Dumping database for cache..."
{
docker compose -f ../docker-compose.resolved.yml exec -T db \
pg_dump -U postgres --data-only --column-inserts \
--table='auth.users' postgres
docker compose -f ../docker-compose.resolved.yml exec -T db \
pg_dump -U postgres --data-only --column-inserts \
--schema=platform \
--exclude-table='platform._prisma_migrations' \
--exclude-table='platform.apscheduler_jobs' \
--exclude-table='platform.apscheduler_jobs_batched_notifications' \
postgres
} > /tmp/e2e_test_data.sql
echo "✅ Database dump created for caching ($(wc -l < /tmp/e2e_test_data.sql) lines)"
- name: Set up tests - Enable corepack
run: corepack enable
- name: Set up tests - Set up Node
uses: actions/setup-node@v6
with:
node-version: "22.18.0"
cache: "pnpm"
cache-dependency-path: autogpt_platform/frontend/pnpm-lock.yaml
- name: Set up tests - Install dependencies
run: pnpm install --frozen-lockfile
- name: Set up tests - Install browser 'chromium'
run: pnpm playwright install --with-deps chromium
- name: Run Playwright tests
run: pnpm test:no-build
continue-on-error: false
- name: Upload Playwright report
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-report
path: playwright-report
if-no-files-found: ignore
retention-days: 3
- name: Upload Playwright test results
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-test-results
path: test-results
if-no-files-found: ignore
retention-days: 3
- name: Print Final Docker Compose logs
if: always()
run: docker compose -f ../docker-compose.resolved.yml logs

View File

@@ -40,7 +40,7 @@ from backend.copilot.response_model import (
from backend.copilot.service import (
_build_system_prompt,
_generate_session_title,
client,
_get_openai_client,
config,
)
from backend.copilot.tools import execute_tool, get_available_tools
@@ -89,7 +89,7 @@ async def _compress_session_messages(
result = await compress_context(
messages=messages_dict,
model=config.model,
client=client,
client=_get_openai_client(),
)
except Exception as e:
logger.warning("[Baseline] Context compression with LLM failed: %s", e)
@@ -235,7 +235,7 @@ async def stream_chat_completion_baseline(
)
if tools:
create_kwargs["tools"] = tools
response = await client.chat.completions.create(**create_kwargs) # type: ignore[arg-type] # dynamic kwargs
response = await _get_openai_client().chat.completions.create(**create_kwargs) # type: ignore[arg-type] # dynamic kwargs
# Accumulate streamed response (text + tool calls)
round_text = ""

View File

@@ -94,6 +94,11 @@ class ChatConfig(BaseSettings):
description="Use --resume for multi-turn conversations instead of "
"history compression. Falls back to compression when unavailable.",
)
use_openrouter: bool = Field(
default=True,
description="Route API calls through OpenRouter proxy. When False, the SDK "
"uses ANTHROPIC_API_KEY from the environment directly (no proxy hop).",
)
use_claude_code_subscription: bool = Field(
default=False,
description="For personal/dev use: use Claude Code CLI subscription auth instead of API keys. Requires `claude login` on the host. Only works with SDK mode.",
@@ -115,7 +120,7 @@ class ChatConfig(BaseSettings):
description="E2B sandbox template to use for copilot sessions.",
)
e2b_sandbox_timeout: int = Field(
default=420, # 7 min safety net — allows headroom for compaction retries
default=300, # 5 min safety net — explicit per-turn pause is the primary mechanism
description="E2B sandbox running-time timeout (seconds). "
"E2B timeout is wall-clock (not idle). Explicit per-turn pause is the primary "
"mechanism; this is the safety net.",
@@ -209,6 +214,15 @@ class ChatConfig(BaseSettings):
# Default to True (SDK enabled by default)
return True if v is None else v
@field_validator("use_openrouter", mode="before")
@classmethod
def get_use_openrouter(cls, v):
"""Get use_openrouter from environment if not provided."""
env_val = os.getenv("CHAT_USE_OPENROUTER", "").lower()
if env_val:
return env_val in ("true", "1", "yes", "on")
return True if v is None else v
@field_validator("use_claude_code_subscription", mode="before")
@classmethod
def get_use_claude_code_subscription(cls, v):

View File

@@ -4,6 +4,9 @@
# The hex suffix makes accidental LLM generation of these strings virtually
# impossible, avoiding false-positive marker detection in normal conversation.
COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]" # Renders as ErrorCard
COPILOT_RETRYABLE_ERROR_PREFIX = (
"[__COPILOT_RETRYABLE_ERROR_a9c2__]" # ErrorCard + retry
)
COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]" # Renders as system info message
# Prefix for all synthetic IDs generated by CoPilot block execution.
@@ -35,3 +38,24 @@ def parse_node_id_from_exec_id(node_exec_id: str) -> str:
Format: "{node_id}:{random_hex}" → returns "{node_id}".
"""
return node_exec_id.rsplit(COPILOT_NODE_EXEC_ID_SEPARATOR, 1)[0]
# ---------------------------------------------------------------------------
# Transient Anthropic API error detection
# ---------------------------------------------------------------------------
# Patterns in error text that indicate a transient Anthropic API error
# (ECONNRESET / dropped TCP connection) which is retryable.
_TRANSIENT_ERROR_PATTERNS = (
"socket connection was closed unexpectedly",
"ECONNRESET",
"connection was forcibly closed",
"network socket disconnected",
)
FRIENDLY_TRANSIENT_MSG = "Anthropic connection interrupted — please retry"
def is_transient_api_error(error_text: str) -> bool:
"""Return True if *error_text* matches a known transient Anthropic API error."""
lower = error_text.lower()
return any(pat.lower() in lower for pat in _TRANSIENT_ERROR_PATTERNS)

View File

@@ -43,7 +43,6 @@ class ResponseType(str, Enum):
ERROR = "error"
USAGE = "usage"
HEARTBEAT = "heartbeat"
STATUS = "status"
class StreamBaseResponse(BaseModel):
@@ -233,26 +232,3 @@ class StreamHeartbeat(StreamBaseResponse):
def to_sse(self) -> str:
"""Convert to SSE comment format to keep connection alive."""
return ": heartbeat\n\n"
class StreamStatus(StreamBaseResponse):
"""Transient status notification shown to the user during long operations.
Used to provide feedback when the backend performs behind-the-scenes work
(e.g., compacting conversation context on a retry) that would otherwise
leave the user staring at an unexplained pause.
"""
type: ResponseType = ResponseType.STATUS
message: str = Field(..., description="Human-readable status message")
def to_sse(self) -> str:
"""Encode as an SSE comment so the AI SDK stream parser ignores it.
The frontend AI SDK validates every ``data:`` line against a strict
Zod union of known chunk types. ``"status"`` is not in that union,
so sending it as ``data:`` would cause a schema-validation error that
breaks the entire stream. Using an SSE comment (``:``) keeps the
connection alive and is silently discarded by ``EventSource`` parsers.
"""
return f": status {self.message}\n\n"

View File

@@ -12,7 +12,6 @@ import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from typing import Any
from ..constants import COMPACTION_DONE_MSG, COMPACTION_TOOL_NAME
from ..model import ChatMessage, ChatSession
@@ -120,12 +119,14 @@ def filter_compaction_messages(
filtered: list[ChatMessage] = []
for msg in messages:
if msg.role == "assistant" and msg.tool_calls:
real_calls: list[dict[str, Any]] = []
for tc in msg.tool_calls:
if tc.get("function", {}).get("name") == COMPACTION_TOOL_NAME:
compaction_ids.add(tc.get("id", ""))
else:
real_calls.append(tc)
real_calls = [
tc
for tc in msg.tool_calls
if tc.get("function", {}).get("name") != COMPACTION_TOOL_NAME
]
if not real_calls and not msg.content:
continue
if msg.role == "tool" and msg.tool_call_id in compaction_ids:
@@ -221,7 +222,6 @@ class CompactionTracker:
def reset_for_query(self) -> None:
"""Reset per-query state before a new SDK query."""
self._compact_start.clear()
self._done = False
self._start_emitted = False
self._tool_call_id = ""

View File

@@ -1,41 +0,0 @@
"""Shared test fixtures for copilot SDK tests."""
from __future__ import annotations
from uuid import uuid4
from backend.util import json
def build_test_transcript(pairs: list[tuple[str, str]]) -> str:
"""Build a minimal valid JSONL transcript from (role, content) pairs.
Use this helper in any copilot SDK test that needs a well-formed
transcript without hitting the real storage layer.
"""
lines: list[str] = []
last_uuid: str | None = None
for role, content in pairs:
uid = str(uuid4())
entry_type = "assistant" if role == "assistant" else "user"
msg: dict = {"role": role, "content": content}
if role == "assistant":
msg.update(
{
"model": "",
"id": f"msg_{uid[:8]}",
"type": "message",
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"stop_sequence": None,
}
)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": msg,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n"

View File

@@ -1,552 +0,0 @@
"""Tests for retry logic and transcript compaction helpers."""
from __future__ import annotations
from unittest.mock import AsyncMock, patch
from uuid import uuid4
import pytest
from backend.util import json
from .conftest import build_test_transcript as _build_transcript
from .service import _is_prompt_too_long
from .transcript import (
_flatten_assistant_content,
_flatten_tool_result_content,
_messages_to_transcript,
_transcript_to_messages,
compact_transcript,
validate_transcript,
)
# ---------------------------------------------------------------------------
# _flatten_assistant_content
# ---------------------------------------------------------------------------
class TestFlattenAssistantContent:
def test_text_blocks(self):
blocks = [
{"type": "text", "text": "Hello"},
{"type": "text", "text": "World"},
]
assert _flatten_assistant_content(blocks) == "Hello\nWorld"
def test_tool_use_blocks(self):
blocks = [{"type": "tool_use", "name": "read_file", "input": {}}]
assert _flatten_assistant_content(blocks) == "[tool_use: read_file]"
def test_mixed_blocks(self):
blocks = [
{"type": "text", "text": "Let me read that."},
{"type": "tool_use", "name": "Read", "input": {"path": "/foo"}},
]
result = _flatten_assistant_content(blocks)
assert "Let me read that." in result
assert "[tool_use: Read]" in result
def test_raw_strings(self):
assert _flatten_assistant_content(["hello", "world"]) == "hello\nworld"
def test_unknown_block_type_preserved_as_placeholder(self):
blocks = [
{"type": "text", "text": "See this image:"},
{"type": "image", "source": {"type": "base64", "data": "..."}},
]
result = _flatten_assistant_content(blocks)
assert "See this image:" in result
assert "[__image__]" in result
def test_empty(self):
assert _flatten_assistant_content([]) == ""
# ---------------------------------------------------------------------------
# _flatten_tool_result_content
# ---------------------------------------------------------------------------
class TestFlattenToolResultContent:
def test_tool_result_with_text(self):
blocks = [
{
"type": "tool_result",
"tool_use_id": "123",
"content": [{"type": "text", "text": "file contents here"}],
}
]
assert _flatten_tool_result_content(blocks) == "file contents here"
def test_tool_result_with_string_content(self):
blocks = [{"type": "tool_result", "tool_use_id": "123", "content": "ok"}]
assert _flatten_tool_result_content(blocks) == "ok"
def test_text_block(self):
blocks = [{"type": "text", "text": "plain text"}]
assert _flatten_tool_result_content(blocks) == "plain text"
def test_raw_string(self):
assert _flatten_tool_result_content(["raw"]) == "raw"
def test_tool_result_with_none_content(self):
"""tool_result with content=None should produce empty string."""
blocks = [{"type": "tool_result", "tool_use_id": "x", "content": None}]
assert _flatten_tool_result_content(blocks) == ""
def test_tool_result_with_empty_list_content(self):
"""tool_result with content=[] should produce empty string."""
blocks = [{"type": "tool_result", "tool_use_id": "x", "content": []}]
assert _flatten_tool_result_content(blocks) == ""
def test_empty(self):
assert _flatten_tool_result_content([]) == ""
def test_nested_dict_without_text(self):
"""Dict blocks without text key use json.dumps fallback."""
blocks = [
{
"type": "tool_result",
"tool_use_id": "x",
"content": [{"type": "image", "source": "data:..."}],
}
]
result = _flatten_tool_result_content(blocks)
assert "image" in result # json.dumps fallback
def test_unknown_block_type_preserved_as_placeholder(self):
blocks = [{"type": "image", "source": {"type": "base64", "data": "..."}}]
result = _flatten_tool_result_content(blocks)
assert "[__image__]" in result
# ---------------------------------------------------------------------------
# _transcript_to_messages
# ---------------------------------------------------------------------------
def _make_entry(entry_type: str, role: str, content: str | list, **kwargs) -> str:
"""Build a JSONL line for testing."""
uid = str(uuid4())
msg: dict = {"role": role, "content": content}
msg.update(kwargs)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": None,
"message": msg,
}
return json.dumps(entry, separators=(",", ":"))
class TestTranscriptToMessages:
def test_basic_roundtrip(self):
lines = [
_make_entry("user", "user", "Hello"),
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
assert messages[0] == {"role": "user", "content": "Hello"}
assert messages[1] == {"role": "assistant", "content": "Hi"}
def test_skips_strippable_types(self):
"""Progress and metadata entries are excluded."""
lines = [
_make_entry("user", "user", "Hello"),
json.dumps(
{
"type": "progress",
"uuid": str(uuid4()),
"parentUuid": None,
"message": {"role": "assistant", "content": "..."},
}
),
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
def test_empty_content(self):
assert _transcript_to_messages("") == []
def test_tool_result_content(self):
"""User entries with tool_result content blocks are flattened."""
lines = [
_make_entry(
"user",
"user",
[
{
"type": "tool_result",
"tool_use_id": "123",
"content": "tool output",
}
],
),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 1
assert messages[0]["content"] == "tool output"
def test_malformed_json_lines_skipped(self):
"""Malformed JSON lines in transcript are silently skipped."""
lines = [
_make_entry("user", "user", "Hello"),
"this is not valid json",
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
def test_empty_lines_skipped(self):
"""Empty lines and whitespace-only lines are skipped."""
lines = [
_make_entry("user", "user", "Hello"),
"",
" ",
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
def test_unicode_content_preserved(self):
"""Unicode characters survive transcript roundtrip."""
lines = [
_make_entry("user", "user", "Hello 你好 🌍"),
_make_entry(
"assistant",
"assistant",
[{"type": "text", "text": "Bonjour 日本語 émojis 🎉"}],
),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert messages[0]["content"] == "Hello 你好 🌍"
assert messages[1]["content"] == "Bonjour 日本語 émojis 🎉"
def test_entry_without_role_skipped(self):
"""Entries with missing role in message are skipped."""
entry_no_role = json.dumps(
{
"type": "user",
"uuid": str(uuid4()),
"parentUuid": None,
"message": {"content": "no role here"},
}
)
lines = [
entry_no_role,
_make_entry("user", "user", "Hello"),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 1
assert messages[0]["content"] == "Hello"
def test_tool_use_and_result_pairs(self):
"""Tool use + tool result pairs are properly flattened."""
lines = [
_make_entry(
"assistant",
"assistant",
[
{"type": "text", "text": "Let me check."},
{"type": "tool_use", "name": "read_file", "input": {"path": "/x"}},
],
),
_make_entry(
"user",
"user",
[
{
"type": "tool_result",
"tool_use_id": "abc",
"content": [{"type": "text", "text": "file contents"}],
}
],
),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
assert "Let me check." in messages[0]["content"]
assert "[tool_use: read_file]" in messages[0]["content"]
assert messages[1]["content"] == "file contents"
# ---------------------------------------------------------------------------
# _messages_to_transcript
# ---------------------------------------------------------------------------
class TestMessagesToTranscript:
def test_produces_valid_jsonl(self):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
]
result = _messages_to_transcript(messages)
lines = result.strip().split("\n")
assert len(lines) == 2
for line in lines:
parsed = json.loads(line)
assert "type" in parsed
assert "uuid" in parsed
assert "message" in parsed
def test_assistant_has_proper_structure(self):
messages = [{"role": "assistant", "content": "Hello"}]
result = _messages_to_transcript(messages)
entry = json.loads(result.strip())
assert entry["type"] == "assistant"
msg = entry["message"]
assert msg["role"] == "assistant"
assert msg["type"] == "message"
assert msg["stop_reason"] == "end_turn"
assert isinstance(msg["content"], list)
assert msg["content"][0]["type"] == "text"
def test_user_has_plain_content(self):
messages = [{"role": "user", "content": "Hi"}]
result = _messages_to_transcript(messages)
entry = json.loads(result.strip())
assert entry["type"] == "user"
assert entry["message"]["content"] == "Hi"
def test_parent_uuid_chain(self):
messages = [
{"role": "user", "content": "A"},
{"role": "assistant", "content": "B"},
{"role": "user", "content": "C"},
]
result = _messages_to_transcript(messages)
lines = result.strip().split("\n")
entries = [json.loads(line) for line in lines]
assert entries[0]["parentUuid"] == ""
assert entries[1]["parentUuid"] == entries[0]["uuid"]
assert entries[2]["parentUuid"] == entries[1]["uuid"]
def test_empty_messages(self):
assert _messages_to_transcript([]) == ""
def test_output_is_valid_transcript(self):
"""Output should pass validate_transcript if it has assistant entries."""
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
]
result = _messages_to_transcript(messages)
assert validate_transcript(result)
def test_roundtrip_to_messages(self):
"""Messages → transcript → messages preserves structure."""
original = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "How are you?"},
]
transcript = _messages_to_transcript(original)
restored = _transcript_to_messages(transcript)
assert len(restored) == len(original)
for orig, rest in zip(original, restored):
assert orig["role"] == rest["role"]
assert orig["content"] == rest["content"]
# ---------------------------------------------------------------------------
# compact_transcript
# ---------------------------------------------------------------------------
class TestCompactTranscript:
@pytest.mark.asyncio
async def test_too_few_messages_returns_none(self):
"""compact_transcript returns None when transcript has < 2 messages."""
transcript = _build_transcript([("user", "Hello")])
with patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
):
result = await compact_transcript(transcript, model="test-model")
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_not_compacted(self):
"""When compress_context says no compaction needed, returns None.
The compressor couldn't reduce it, so retrying with the same
content would fail identically."""
transcript = _build_transcript(
[
("user", "Hello"),
("assistant", "Hi there"),
]
)
mock_result = type(
"CompressResult",
(),
{
"was_compacted": False,
"messages": [],
"original_token_count": 100,
"token_count": 100,
"messages_summarized": 0,
"messages_dropped": 0,
},
)()
with (
patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
),
patch(
"backend.copilot.sdk.transcript._run_compression",
new_callable=AsyncMock,
return_value=mock_result,
),
):
result = await compact_transcript(transcript, model="test-model")
assert result is None
@pytest.mark.asyncio
async def test_returns_compacted_transcript(self):
"""When compaction succeeds, returns a valid compacted transcript."""
transcript = _build_transcript(
[
("user", "Hello"),
("assistant", "Hi"),
("user", "More"),
("assistant", "Details"),
]
)
compacted_msgs = [
{"role": "user", "content": "[summary]"},
{"role": "assistant", "content": "Summarized response"},
]
mock_result = type(
"CompressResult",
(),
{
"was_compacted": True,
"messages": compacted_msgs,
"original_token_count": 500,
"token_count": 100,
"messages_summarized": 2,
"messages_dropped": 0,
},
)()
with (
patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
),
patch(
"backend.copilot.sdk.transcript._run_compression",
new_callable=AsyncMock,
return_value=mock_result,
),
):
result = await compact_transcript(transcript, model="test-model")
assert result is not None
assert validate_transcript(result)
msgs = _transcript_to_messages(result)
assert len(msgs) == 2
assert msgs[1]["content"] == "Summarized response"
@pytest.mark.asyncio
async def test_returns_none_on_compression_failure(self):
"""When _run_compression raises, returns None."""
transcript = _build_transcript(
[
("user", "Hello"),
("assistant", "Hi"),
]
)
with (
patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
),
patch(
"backend.copilot.sdk.transcript._run_compression",
new_callable=AsyncMock,
side_effect=RuntimeError("LLM unavailable"),
),
):
result = await compact_transcript(transcript, model="test-model")
assert result is None
# ---------------------------------------------------------------------------
# _is_prompt_too_long
# ---------------------------------------------------------------------------
class TestIsPromptTooLong:
"""Unit tests for _is_prompt_too_long pattern matching."""
def test_prompt_is_too_long(self):
err = RuntimeError("prompt is too long for model context")
assert _is_prompt_too_long(err) is True
def test_request_too_large(self):
err = Exception("request too large: 250000 tokens")
assert _is_prompt_too_long(err) is True
def test_maximum_context_length(self):
err = ValueError("maximum context length exceeded")
assert _is_prompt_too_long(err) is True
def test_context_length_exceeded(self):
err = Exception("context_length_exceeded")
assert _is_prompt_too_long(err) is True
def test_input_tokens_exceed(self):
err = Exception("input tokens exceed the max_tokens limit")
assert _is_prompt_too_long(err) is True
def test_input_is_too_long(self):
err = Exception("input is too long for the model")
assert _is_prompt_too_long(err) is True
def test_content_length_exceeds(self):
err = Exception("content length exceeds maximum")
assert _is_prompt_too_long(err) is True
def test_unrelated_error_returns_false(self):
err = RuntimeError("network timeout")
assert _is_prompt_too_long(err) is False
def test_auth_error_returns_false(self):
err = Exception("authentication failed: invalid API key")
assert _is_prompt_too_long(err) is False
def test_chained_exception_detected(self):
"""Prompt-too-long error wrapped in another exception is detected."""
inner = RuntimeError("prompt is too long")
outer = Exception("SDK error")
outer.__cause__ = inner
assert _is_prompt_too_long(outer) is True
def test_case_insensitive(self):
err = Exception("PROMPT IS TOO LONG")
assert _is_prompt_too_long(err) is True
def test_old_max_tokens_exceeded_not_matched(self):
"""The old broad 'max_tokens_exceeded' pattern was removed.
Only 'input tokens exceed' should match now."""
err = Exception("max_tokens_exceeded")
assert _is_prompt_too_long(err) is False

View File

@@ -20,6 +20,7 @@ from claude_agent_sdk import (
UserMessage,
)
from backend.copilot.constants import FRIENDLY_TRANSIENT_MSG, is_transient_api_error
from backend.copilot.response_model import (
StreamBaseResponse,
StreamError,
@@ -214,10 +215,12 @@ class SDKResponseAdapter:
if sdk_message.subtype == "success":
responses.append(StreamFinish())
elif sdk_message.subtype in ("error", "error_during_execution"):
error_msg = sdk_message.result or "Unknown error"
responses.append(
StreamError(errorText=str(error_msg), code="sdk_error")
)
raw_error = str(sdk_message.result or "Unknown error")
if is_transient_api_error(raw_error):
error_text, code = FRIENDLY_TRANSIENT_MSG, "transient_api_error"
else:
error_text, code = raw_error, "sdk_error"
responses.append(StreamError(errorText=error_text, code=code))
responses.append(StreamFinish())
else:
logger.warning(
@@ -226,7 +229,7 @@ class SDKResponseAdapter:
responses.append(StreamFinish())
else:
logger.debug("Unhandled SDK message type: %s", type(sdk_message).__name__)
logger.debug(f"Unhandled SDK message type: {type(sdk_message).__name__}")
return responses

View File

@@ -52,7 +52,7 @@ def _validate_workspace_path(
if is_allowed_local_path(path, sdk_cwd):
return {}
logger.warning("Blocked %s outside workspace: %s", tool_name, path)
logger.warning(f"Blocked {tool_name} outside workspace: {path}")
workspace_hint = f" Allowed workspace: {sdk_cwd}" if sdk_cwd else ""
return _deny(
f"[SECURITY] Tool '{tool_name}' can only access files within the workspace "
@@ -71,7 +71,7 @@ def _validate_tool_access(
"""
# Block forbidden tools
if tool_name in BLOCKED_TOOLS:
logger.warning("Blocked tool access attempt: %s", tool_name)
logger.warning(f"Blocked tool access attempt: {tool_name}")
return _deny(
f"[SECURITY] Tool '{tool_name}' is blocked for security. "
"This is enforced by the platform and cannot be bypassed. "
@@ -89,9 +89,7 @@ def _validate_tool_access(
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(
"Blocked dangerous pattern in tool input: %s in %s",
pattern,
tool_name,
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return _deny(
"[SECURITY] Input contains a blocked pattern. "
@@ -113,9 +111,7 @@ def _validate_user_isolation(
# the tool itself via _validate_ephemeral_path.
path = tool_input.get("path", "") or tool_input.get("file_path", "")
if path and ".." in path:
logger.warning(
"Blocked path traversal attempt: %s by user %s", path, user_id
)
logger.warning(f"Blocked path traversal attempt: {path} by user {user_id}")
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
@@ -174,7 +170,7 @@ def create_security_hooks(
# Block background task execution first — denied calls
# should not consume a subtask slot.
if tool_input.get("run_in_background"):
logger.info("[SDK] Blocked background Task, user=%s", user_id)
logger.info(f"[SDK] Blocked background Task, user={user_id}")
return cast(
SyncHookJSONOutput,
_deny(
@@ -185,9 +181,7 @@ def create_security_hooks(
)
if len(task_tool_use_ids) >= max_subtasks:
logger.warning(
"[SDK] Task limit reached (%d), user=%s",
max_subtasks,
user_id,
f"[SDK] Task limit reached ({max_subtasks}), user={user_id}"
)
return cast(
SyncHookJSONOutput,
@@ -218,7 +212,7 @@ def create_security_hooks(
if tool_name == "Task" and tool_use_id is not None:
task_tool_use_ids.add(tool_use_id)
logger.debug("[SDK] Tool start: %s, user=%s", tool_name, user_id)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
return cast(SyncHookJSONOutput, {})
def _release_task_slot(tool_name: str, tool_use_id: str | None) -> None:
@@ -288,11 +282,8 @@ def create_security_hooks(
tool_name = cast(str, input_data.get("tool_name", ""))
error = input_data.get("error", "Unknown error")
logger.warning(
"[SDK] Tool failed: %s, error=%s, user=%s, tool_use_id=%s",
tool_name,
str(error).replace("\n", "").replace("\r", ""),
user_id,
tool_use_id,
f"[SDK] Tool failed: {tool_name}, error={error}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
_release_task_slot(tool_name, tool_use_id)
@@ -310,19 +301,16 @@ def create_security_hooks(
This hook provides visibility into when compaction happens.
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
# Sanitize untrusted input before logging to prevent log injection
trigger = (
str(input_data.get("trigger", "auto"))
.replace("\n", "")
.replace("\r", "")
)
transcript_path = (
str(input_data.get("transcript_path", ""))
.replace("\n", "")
.replace("\r", "")
)
logger.info(
"[SDK] Context compaction triggered: %s, user=%s, transcript_path=%s",
"[SDK] Context compaction triggered: %s, user=%s, "
"transcript_path=%s",
trigger,
user_id,
transcript_path,

File diff suppressed because it is too large Load Diff

View File

@@ -1,283 +0,0 @@
"""Unit tests for extracted service helpers.
Covers ``_is_prompt_too_long``, ``_reduce_context``, ``_iter_sdk_messages``,
and the ``ReducedContext`` named tuple.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, patch
import pytest
from .conftest import build_test_transcript as _build_transcript
from .service import (
ReducedContext,
_is_prompt_too_long,
_iter_sdk_messages,
_reduce_context,
)
# ---------------------------------------------------------------------------
# _is_prompt_too_long
# ---------------------------------------------------------------------------
class TestIsPromptTooLong:
def test_direct_match(self) -> None:
assert _is_prompt_too_long(Exception("prompt is too long")) is True
def test_case_insensitive(self) -> None:
assert _is_prompt_too_long(Exception("PROMPT IS TOO LONG")) is True
def test_no_match(self) -> None:
assert _is_prompt_too_long(Exception("network timeout")) is False
def test_request_too_large(self) -> None:
assert _is_prompt_too_long(Exception("request too large for model")) is True
def test_context_length_exceeded(self) -> None:
assert _is_prompt_too_long(Exception("context_length_exceeded")) is True
def test_max_tokens_exceeded_not_matched(self) -> None:
"""'max_tokens_exceeded' is intentionally excluded (too broad)."""
assert _is_prompt_too_long(Exception("max_tokens_exceeded")) is False
def test_max_tokens_config_error_no_match(self) -> None:
"""'max_tokens must be at least 1' should NOT match."""
assert _is_prompt_too_long(Exception("max_tokens must be at least 1")) is False
def test_chained_cause(self) -> None:
inner = Exception("prompt is too long")
outer = RuntimeError("SDK error")
outer.__cause__ = inner
assert _is_prompt_too_long(outer) is True
def test_chained_context(self) -> None:
inner = Exception("request too large")
outer = RuntimeError("wrapped")
outer.__context__ = inner
assert _is_prompt_too_long(outer) is True
def test_deep_chain(self) -> None:
bottom = Exception("maximum context length")
middle = RuntimeError("middle")
middle.__cause__ = bottom
top = ValueError("top")
top.__cause__ = middle
assert _is_prompt_too_long(top) is True
def test_chain_no_match(self) -> None:
inner = Exception("rate limit exceeded")
outer = RuntimeError("wrapped")
outer.__cause__ = inner
assert _is_prompt_too_long(outer) is False
def test_cycle_detection(self) -> None:
"""Exception chain with a cycle should not infinite-loop."""
a = Exception("error a")
b = Exception("error b")
a.__cause__ = b
b.__cause__ = a # cycle
assert _is_prompt_too_long(a) is False
def test_all_patterns(self) -> None:
patterns = [
"prompt is too long",
"request too large",
"maximum context length",
"context_length_exceeded",
"input tokens exceed",
"input is too long",
"content length exceeds",
]
for pattern in patterns:
assert _is_prompt_too_long(Exception(pattern)) is True, pattern
# ---------------------------------------------------------------------------
# _reduce_context
# ---------------------------------------------------------------------------
class TestReduceContext:
@pytest.mark.asyncio
async def test_first_retry_compaction_success(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
compacted = _build_transcript([("user", "hi"), ("assistant", "[summary]")])
with (
patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=compacted,
),
patch(
"backend.copilot.sdk.service.validate_transcript",
return_value=True,
),
patch(
"backend.copilot.sdk.service.write_transcript_to_tempfile",
return_value="/tmp/resume.jsonl",
),
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert isinstance(ctx, ReducedContext)
assert ctx.use_resume is True
assert ctx.resume_file == "/tmp/resume.jsonl"
assert ctx.transcript_lost is False
assert ctx.tried_compaction is True
@pytest.mark.asyncio
async def test_compaction_fails_drops_transcript(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
with patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=None,
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert ctx.use_resume is False
assert ctx.resume_file is None
assert ctx.transcript_lost is True
assert ctx.tried_compaction is True
@pytest.mark.asyncio
async def test_already_tried_compaction_skips(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
ctx = await _reduce_context(transcript, True, "sess-123", "/tmp/cwd", "[test]")
assert ctx.use_resume is False
assert ctx.transcript_lost is True
assert ctx.tried_compaction is True
@pytest.mark.asyncio
async def test_empty_transcript_drops(self) -> None:
ctx = await _reduce_context("", False, "sess-123", "/tmp/cwd", "[test]")
assert ctx.use_resume is False
assert ctx.transcript_lost is True
@pytest.mark.asyncio
async def test_compaction_returns_same_content_drops(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
with patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=transcript, # same content
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert ctx.transcript_lost is True
@pytest.mark.asyncio
async def test_write_tempfile_fails_drops(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
compacted = _build_transcript([("user", "hi"), ("assistant", "[summary]")])
with (
patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=compacted,
),
patch(
"backend.copilot.sdk.service.validate_transcript",
return_value=True,
),
patch(
"backend.copilot.sdk.service.write_transcript_to_tempfile",
return_value=None,
),
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert ctx.transcript_lost is True
# ---------------------------------------------------------------------------
# _iter_sdk_messages
# ---------------------------------------------------------------------------
class TestIterSdkMessages:
@pytest.mark.asyncio
async def test_yields_messages(self) -> None:
messages = ["msg1", "msg2", "msg3"]
client = AsyncMock()
async def _fake_receive() -> AsyncGenerator[str]:
for m in messages:
yield m
client.receive_response = _fake_receive
result = [msg async for msg in _iter_sdk_messages(client)]
assert result == messages
@pytest.mark.asyncio
async def test_heartbeat_on_timeout(self) -> None:
"""Yields None when asyncio.wait times out."""
client = AsyncMock()
received: list = []
async def _slow_receive() -> AsyncGenerator[str]:
await asyncio.sleep(100) # never completes
yield "never" # pragma: no cover — unreachable, yield makes this an async generator
client.receive_response = _slow_receive
with patch("backend.copilot.sdk.service._HEARTBEAT_INTERVAL", 0.01):
count = 0
async for msg in _iter_sdk_messages(client):
received.append(msg)
count += 1
if count >= 3:
break
assert all(m is None for m in received)
@pytest.mark.asyncio
async def test_exception_propagates(self) -> None:
client = AsyncMock()
async def _error_receive() -> AsyncGenerator[str]:
raise RuntimeError("SDK crash")
yield # pragma: no cover — unreachable, yield makes this an async generator
client.receive_response = _error_receive
with pytest.raises(RuntimeError, match="SDK crash"):
async for _ in _iter_sdk_messages(client):
pass
@pytest.mark.asyncio
async def test_task_cleanup_on_break(self) -> None:
"""Pending task is cancelled when generator is closed."""
client = AsyncMock()
async def _slow_receive() -> AsyncGenerator[str]:
yield "first"
await asyncio.sleep(100)
yield "second"
client.receive_response = _slow_receive
gen = _iter_sdk_messages(client)
first = await gen.__anext__()
assert first == "first"
await gen.aclose() # should cancel pending task cleanly

View File

@@ -234,9 +234,7 @@ def create_tool_handler(base_tool: BaseTool):
try:
return await _execute_tool_sync(base_tool, user_id, session, args)
except Exception as e:
logger.error(
"Error executing tool %s: %s", base_tool.name, e, exc_info=True
)
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
return _mcp_error(f"Failed to execute {base_tool.name}: {e}")
return tool_handler

View File

@@ -10,9 +10,6 @@ Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local
filesystem for self-hosted) — no DB column needed.
"""
from __future__ import annotations
import asyncio
import logging
import os
import re
@@ -20,12 +17,8 @@ import shutil
import time
from dataclasses import dataclass
from pathlib import Path
from uuid import uuid4
from backend.util import json
from backend.util.clients import get_openai_client
from backend.util.prompt import CompressResult, compress_context
from backend.util.workspace_storage import GCSWorkspaceStorage, get_workspace_storage
logger = logging.getLogger(__name__)
@@ -106,14 +99,7 @@ def strip_progress_entries(content: str) -> str:
continue
parent = entry.get("parentUuid", "")
original_parent = parent
# seen_parents is local per-entry (not shared across iterations) so
# it can only detect cycles within a single ancestry walk, not across
# entries. This is intentional: each entry's parent chain is
# independent, and reusing a global set would incorrectly short-circuit
# valid re-use of the same UUID as a parent in different subtrees.
seen_parents: set[str] = set()
while parent in stripped_uuids and parent not in seen_parents:
seen_parents.add(parent)
while parent in stripped_uuids:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
@@ -341,7 +327,7 @@ def write_transcript_to_tempfile(
# Validate cwd is under the expected sandbox prefix (CodeQL sanitizer).
real_cwd = os.path.realpath(cwd)
if not real_cwd.startswith(_SAFE_CWD_PREFIX):
logger.warning("[Transcript] cwd outside sandbox: %s", cwd)
logger.warning(f"[Transcript] cwd outside sandbox: {cwd}")
return None
try:
@@ -351,17 +337,17 @@ def write_transcript_to_tempfile(
os.path.join(real_cwd, f"transcript-{safe_id}.jsonl")
)
if not jsonl_path.startswith(real_cwd):
logger.warning("[Transcript] Path escaped cwd: %s", jsonl_path)
logger.warning(f"[Transcript] Path escaped cwd: {jsonl_path}")
return None
with open(jsonl_path, "w") as f:
f.write(transcript_content)
logger.info("[Transcript] Wrote resume file: %s", jsonl_path)
logger.info(f"[Transcript] Wrote resume file: {jsonl_path}")
return jsonl_path
except OSError as e:
logger.warning("[Transcript] Failed to write resume file: %s", e)
logger.warning(f"[Transcript] Failed to write resume file: {e}")
return None
@@ -422,6 +408,8 @@ def _meta_storage_path_parts(user_id: str, session_id: str) -> tuple[str, str, s
def _build_path_from_parts(parts: tuple[str, str, str], backend: object) -> str:
"""Build a full storage path from (workspace_id, file_id, filename) parts."""
from backend.util.workspace_storage import GCSWorkspaceStorage
wid, fid, fname = parts
if isinstance(backend, GCSWorkspaceStorage):
blob = f"workspaces/{wid}/{fid}/{fname}"
@@ -460,15 +448,17 @@ async def upload_transcript(
content: Complete JSONL transcript (from TranscriptBuilder).
message_count: ``len(session.messages)`` at upload time.
"""
from backend.util.workspace_storage import get_workspace_storage
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types = [
json.loads(line, fallback={"type": "INVALID_JSON"}).get("type", "?")
for line in stripped.strip().split("\n")
]
entry_types: list[str] = []
for line in stripped.strip().split("\n"):
entry = json.loads(line, fallback={"type": "INVALID_JSON"})
entry_types.append(entry.get("type", "?"))
logger.warning(
"%s Skipping upload — stripped content not valid "
"(types=%s, stripped_len=%d, raw_len=%d)",
@@ -504,14 +494,11 @@ async def upload_transcript(
content=json.dumps(meta).encode("utf-8"),
)
except Exception as e:
logger.warning("%s Failed to write metadata: %s", log_prefix, e)
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
logger.info(
"%s Uploaded %dB (stripped from %dB, msg_count=%d)",
log_prefix,
len(encoded),
len(content),
message_count,
f"{log_prefix} Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count})"
)
@@ -525,6 +512,8 @@ async def download_transcript(
Returns a ``TranscriptDownload`` with the JSONL content and the
``message_count`` watermark from the upload, or ``None`` if not found.
"""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
@@ -532,10 +521,10 @@ async def download_transcript(
data = await storage.retrieve(path)
content = data.decode("utf-8")
except FileNotFoundError:
logger.debug("%s No transcript in storage", log_prefix)
logger.debug(f"{log_prefix} No transcript in storage")
return None
except Exception as e:
logger.warning("%s Failed to download transcript: %s", log_prefix, e)
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
return None
# Try to load metadata (best-effort — old transcripts won't have it)
@@ -547,14 +536,10 @@ async def download_transcript(
meta = json.loads(meta_data.decode("utf-8"), fallback={})
message_count = meta.get("message_count", 0)
uploaded_at = meta.get("uploaded_at", 0.0)
except FileNotFoundError:
except (FileNotFoundError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
except Exception as e:
logger.debug("%s Failed to load transcript metadata: %s", log_prefix, e)
logger.info(
"%s Downloaded %dB (msg_count=%d)", log_prefix, len(content), message_count
)
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
return TranscriptDownload(
content=content,
message_count=message_count,
@@ -568,6 +553,8 @@ async def delete_transcript(user_id: str, session_id: str) -> None:
Removes both the ``.jsonl`` transcript and the companion ``.meta.json``
so stale ``message_count`` watermarks cannot corrupt gap-fill logic.
"""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
@@ -584,280 +571,3 @@ async def delete_transcript(user_id: str, session_id: str) -> None:
logger.info("[Transcript] Deleted metadata for session %s", session_id)
except Exception as e:
logger.warning("[Transcript] Failed to delete metadata: %s", e)
# ---------------------------------------------------------------------------
# Transcript compaction — LLM summarization for prompt-too-long recovery
# ---------------------------------------------------------------------------
# JSONL protocol values used in transcript serialization.
STOP_REASON_END_TURN = "end_turn"
COMPACT_MSG_ID_PREFIX = "msg_compact_"
ENTRY_TYPE_MESSAGE = "message"
def _flatten_assistant_content(blocks: list) -> str:
"""Flatten assistant content blocks into a single plain-text string.
Structured ``tool_use`` blocks are converted to ``[tool_use: name]``
placeholders. This is intentional: ``compress_context`` requires plain
text for token counting and LLM summarization. The structural loss is
acceptable because compaction only runs when the original transcript was
already too large for the model — a summarized plain-text version is
better than no context at all.
"""
parts: list[str] = []
for block in blocks:
if isinstance(block, dict):
btype = block.get("type", "")
if btype == "text":
parts.append(block.get("text", ""))
elif btype == "tool_use":
parts.append(f"[tool_use: {block.get('name', '?')}]")
else:
# Preserve non-text blocks (e.g. image) as placeholders.
# Use __prefix__ to distinguish from literal user text.
parts.append(f"[__{btype}__]")
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts) if parts else ""
def _flatten_tool_result_content(blocks: list) -> str:
"""Flatten tool_result and other content blocks into plain text.
Handles nested tool_result structures, text blocks, and raw strings.
Uses ``json.dumps`` as fallback for dict blocks without a ``text`` key
or where ``text`` is ``None``.
Like ``_flatten_assistant_content``, structured blocks (images, nested
tool results) are reduced to text representations for compression.
"""
str_parts: list[str] = []
for block in blocks:
if isinstance(block, dict) and block.get("type") == "tool_result":
inner = block.get("content") or ""
if isinstance(inner, list):
for sub in inner:
if isinstance(sub, dict):
sub_type = sub.get("type")
if sub_type in ("image", "document"):
# Avoid serializing base64 binary data into
# the compaction input — use a placeholder.
str_parts.append(f"[__{sub_type}__]")
elif sub_type == "text" or sub.get("text") is not None:
str_parts.append(str(sub.get("text", "")))
else:
str_parts.append(json.dumps(sub))
else:
str_parts.append(str(sub))
else:
str_parts.append(str(inner))
elif isinstance(block, dict) and block.get("type") == "text":
str_parts.append(str(block.get("text", "")))
elif isinstance(block, dict):
# Preserve non-text/non-tool_result blocks (e.g. image) as placeholders.
# Use __prefix__ to distinguish from literal user text.
btype = block.get("type", "unknown")
str_parts.append(f"[__{btype}__]")
elif isinstance(block, str):
str_parts.append(block)
return "\n".join(str_parts) if str_parts else ""
def _transcript_to_messages(content: str) -> list[dict]:
"""Convert JSONL transcript entries to plain message dicts for compression.
Parses each line of the JSONL *content*, skips strippable metadata entries
(progress, file-history-snapshot, etc.), and extracts the ``role`` and
flattened ``content`` from the ``message`` field of each remaining entry.
Structured content blocks (``tool_use``, ``tool_result``, images) are
flattened to plain text via ``_flatten_assistant_content`` and
``_flatten_tool_result_content`` so that ``compress_context`` can
perform token counting and LLM summarization on uniform strings.
Returns:
A list of ``{"role": str, "content": str}`` dicts suitable for
``compress_context``.
"""
messages: list[dict] = []
for line in content.strip().split("\n"):
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if not isinstance(entry, dict):
continue
if entry.get("type", "") in STRIPPABLE_TYPES and not entry.get(
"isCompactSummary"
):
continue
msg = entry.get("message", {})
role = msg.get("role", "")
if not role:
continue
msg_dict: dict = {"role": role}
raw_content = msg.get("content")
if role == "assistant" and isinstance(raw_content, list):
msg_dict["content"] = _flatten_assistant_content(raw_content)
elif isinstance(raw_content, list):
msg_dict["content"] = _flatten_tool_result_content(raw_content)
else:
msg_dict["content"] = raw_content or ""
messages.append(msg_dict)
return messages
def _messages_to_transcript(messages: list[dict]) -> str:
"""Convert compressed message dicts back to JSONL transcript format.
Rebuilds a minimal JSONL transcript from the ``{"role", "content"}``
dicts returned by ``compress_context``. Each message becomes one JSONL
line with a fresh ``uuid`` / ``parentUuid`` chain so the CLI's
``--resume`` flag can reconstruct a valid conversation tree.
Assistant messages are wrapped in the full ``message`` envelope
(``id``, ``model``, ``stop_reason``, structured ``content`` blocks)
that the CLI expects. User messages use the simpler ``{role, content}``
form.
Returns:
A newline-terminated JSONL string, or an empty string if *messages*
is empty.
"""
lines: list[str] = []
last_uuid: str = "" # root entry uses empty string, not null
for msg in messages:
role = msg.get("role", "user")
entry_type = "assistant" if role == "assistant" else "user"
uid = str(uuid4())
content = msg.get("content", "")
if role == "assistant":
message: dict = {
"role": "assistant",
"model": "",
"id": f"{COMPACT_MSG_ID_PREFIX}{uuid4().hex[:24]}",
"type": ENTRY_TYPE_MESSAGE,
"content": [{"type": "text", "text": content}] if content else [],
"stop_reason": STOP_REASON_END_TURN,
"stop_sequence": None,
}
else:
message = {"role": role, "content": content}
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": message,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n" if lines else ""
_COMPACTION_TIMEOUT_SECONDS = 60
_TRUNCATION_TIMEOUT_SECONDS = 30
async def _run_compression(
messages: list[dict],
model: str,
log_prefix: str,
) -> CompressResult:
"""Run LLM-based compression with truncation fallback.
Uses the shared OpenAI client from ``get_openai_client()``.
If no client is configured or the LLM call fails, falls back to
truncation-based compression which drops older messages without
summarization.
A 60-second timeout prevents a hung LLM call from blocking the
retry path indefinitely. The truncation fallback also has a
30-second timeout to guard against slow tokenization on very large
transcripts.
"""
client = get_openai_client()
if client is None:
logger.warning("%s No OpenAI client configured, using truncation", log_prefix)
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=None),
timeout=_TRUNCATION_TIMEOUT_SECONDS,
)
try:
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=client),
timeout=_COMPACTION_TIMEOUT_SECONDS,
)
except Exception as e:
logger.warning("%s LLM compaction failed, using truncation: %s", log_prefix, e)
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=None),
timeout=_TRUNCATION_TIMEOUT_SECONDS,
)
async def compact_transcript(
content: str,
*,
model: str,
log_prefix: str = "[Transcript]",
) -> str | None:
"""Compact an oversized JSONL transcript using LLM summarization.
Converts transcript entries to plain messages, runs ``compress_context``
(the same compressor used for pre-query history), and rebuilds JSONL.
Structured content (``tool_use`` blocks, ``tool_result`` nesting, images)
is flattened to plain text for compression. This matches the fidelity of
the Plan C (DB compression) fallback path, where
``_format_conversation_context`` similarly renders tool calls as
``You called tool: name(args)`` and results as ``Tool result: ...``.
Neither path preserves structured API content blocks — the compacted
context serves as text history for the LLM, which creates proper
structured tool calls going forward.
Images are per-turn attachments loaded from workspace storage by file ID
(via ``_prepare_file_attachments``), not part of the conversation history.
They are re-attached each turn and are unaffected by compaction.
Returns the compacted JSONL string, or ``None`` on failure.
See also:
``_compress_messages`` in ``service.py`` — compresses ``ChatMessage``
lists for pre-query DB history. Both share ``compress_context()``
but operate on different input formats (JSONL transcript entries
here vs. ChatMessage dicts there).
"""
messages = _transcript_to_messages(content)
if len(messages) < 2:
logger.warning("%s Too few messages to compact (%d)", log_prefix, len(messages))
return None
try:
result = await _run_compression(messages, model, log_prefix)
if not result.was_compacted:
# Compressor says it's within budget, but the SDK rejected it.
# Return None so the caller falls through to DB fallback.
logger.warning(
"%s Compressor reports within budget but SDK rejected — "
"signalling failure",
log_prefix,
)
return None
logger.info(
"%s Compacted transcript: %d->%d tokens (%d summarized, %d dropped)",
log_prefix,
result.original_token_count,
result.token_count,
result.messages_summarized,
result.messages_dropped,
)
compacted = _messages_to_transcript(result.messages)
if not validate_transcript(compacted):
logger.warning("%s Compacted transcript failed validation", log_prefix)
return None
return compacted
except Exception as e:
logger.error(
"%s Transcript compaction failed: %s", log_prefix, e, exc_info=True
)
return None

View File

@@ -68,7 +68,7 @@ class TranscriptBuilder:
type=entry_type,
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
isCompactSummary=data.get("isCompactSummary"),
isCompactSummary=data.get("isCompactSummary") or None,
message=data.get("message", {}),
)

View File

@@ -1,7 +1,7 @@
"""Unit tests for JSONL transcript management utilities."""
import os
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
import pytest
@@ -382,7 +382,7 @@ class TestDeleteTranscript:
mock_storage.delete = AsyncMock()
with patch(
"backend.copilot.sdk.transcript.get_workspace_storage",
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
@@ -402,7 +402,7 @@ class TestDeleteTranscript:
)
with patch(
"backend.copilot.sdk.transcript.get_workspace_storage",
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
@@ -420,7 +420,7 @@ class TestDeleteTranscript:
)
with patch(
"backend.copilot.sdk.transcript.get_workspace_storage",
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
@@ -897,134 +897,3 @@ class TestCompactionFlowIntegration:
output2 = builder2.to_jsonl()
lines2 = [json.loads(line) for line in output2.strip().split("\n")]
assert lines2[-1]["parentUuid"] == "a2"
# ---------------------------------------------------------------------------
# _run_compression (direct tests for the 3 code paths)
# ---------------------------------------------------------------------------
class TestRunCompression:
"""Direct tests for ``_run_compression`` covering all 3 code paths.
Paths:
(a) No OpenAI client configured → truncation fallback immediately.
(b) LLM success → returns LLM-compressed result.
(c) LLM call raises → truncation fallback.
"""
def _make_compress_result(self, was_compacted: bool, msgs=None):
"""Build a minimal CompressResult-like object."""
from types import SimpleNamespace
return SimpleNamespace(
was_compacted=was_compacted,
messages=msgs or [{"role": "user", "content": "summary"}],
original_token_count=500,
token_count=100 if was_compacted else 500,
messages_summarized=2 if was_compacted else 0,
messages_dropped=0,
)
@pytest.mark.asyncio
async def test_no_client_uses_truncation(self):
"""Path (a): ``get_openai_client()`` returns None → truncation only."""
from .transcript import _run_compression
truncation_result = self._make_compress_result(
True, [{"role": "user", "content": "truncated"}]
)
with (
patch(
"backend.copilot.sdk.transcript.get_openai_client",
return_value=None,
),
patch(
"backend.copilot.sdk.transcript.compress_context",
new_callable=AsyncMock,
return_value=truncation_result,
) as mock_compress,
):
result = await _run_compression(
[{"role": "user", "content": "hello"}],
model="test-model",
log_prefix="[test]",
)
# compress_context called with client=None (truncation mode)
call_kwargs = mock_compress.call_args
assert (
call_kwargs.kwargs.get("client") is None
or (call_kwargs.args and call_kwargs.args[2] is None)
or mock_compress.call_args[1].get("client") is None
)
assert result is truncation_result
@pytest.mark.asyncio
async def test_llm_success_returns_llm_result(self):
"""Path (b): ``get_openai_client()`` returns a client → LLM compresses."""
from .transcript import _run_compression
llm_result = self._make_compress_result(
True, [{"role": "user", "content": "LLM summary"}]
)
mock_client = MagicMock()
with (
patch(
"backend.copilot.sdk.transcript.get_openai_client",
return_value=mock_client,
),
patch(
"backend.copilot.sdk.transcript.compress_context",
new_callable=AsyncMock,
return_value=llm_result,
) as mock_compress,
):
result = await _run_compression(
[{"role": "user", "content": "long conversation"}],
model="test-model",
log_prefix="[test]",
)
# compress_context called with the real client
assert mock_compress.called
assert result is llm_result
@pytest.mark.asyncio
async def test_llm_failure_falls_back_to_truncation(self):
"""Path (c): LLM call raises → truncation fallback used instead."""
from .transcript import _run_compression
truncation_result = self._make_compress_result(
True, [{"role": "user", "content": "truncated fallback"}]
)
mock_client = MagicMock()
call_count = [0]
async def _compress_side_effect(**kwargs):
call_count[0] += 1
if kwargs.get("client") is not None:
raise RuntimeError("LLM timeout")
return truncation_result
with (
patch(
"backend.copilot.sdk.transcript.get_openai_client",
return_value=mock_client,
),
patch(
"backend.copilot.sdk.transcript.compress_context",
side_effect=_compress_side_effect,
),
):
result = await _run_compression(
[{"role": "user", "content": "long conversation"}],
model="test-model",
log_prefix="[test]",
)
# compress_context called twice: once for LLM (raises), once for truncation
assert call_count[0] == 2
assert result is truncation_result

View File

@@ -28,10 +28,24 @@ logger = logging.getLogger(__name__)
config = ChatConfig()
settings = Settings()
client = LangfuseAsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
_client: LangfuseAsyncOpenAI | None = None
_langfuse = None
langfuse = get_client()
def _get_openai_client() -> LangfuseAsyncOpenAI:
global _client
if _client is None:
_client = LangfuseAsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
return _client
def _get_langfuse():
global _langfuse
if _langfuse is None:
_langfuse = get_client()
return _langfuse
# Default system prompt used when Langfuse is not configured
# Provides minimal baseline tone and personality - all workflow, tools, and
@@ -84,7 +98,7 @@ async def _get_system_prompt_template(context: str) -> str:
else "latest"
)
prompt = await asyncio.to_thread(
langfuse.get_prompt,
_get_langfuse().get_prompt,
config.langfuse_prompt_name,
label=label,
cache_ttl_seconds=config.langfuse_prompt_cache_ttl,
@@ -158,7 +172,7 @@ async def _generate_session_title(
"environment": settings.config.app_env.value,
}
response = await client.chat.completions.create(
response = await _get_openai_client().chat.completions.create(
model=config.title_model,
messages=[
{

View File

@@ -41,7 +41,8 @@ import contextlib
import logging
from typing import Any, Awaitable, Callable, Literal
from e2b import AsyncSandbox, SandboxLifecycle
from e2b import AsyncSandbox
from e2b.sandbox.sandbox_api import SandboxLifecycle
from backend.data.redis_client import get_redis_async

View File

@@ -70,10 +70,6 @@ def _msg_tokens(msg: dict, enc) -> int:
# Count tool result tokens
tool_call_tokens += _tok_len(item.get("tool_use_id", ""), enc)
tool_call_tokens += _tok_len(item.get("content", ""), enc)
elif isinstance(item, dict) and item.get("type") == "text":
# Count text block tokens (standard: "text" key, fallback: "content")
text_val = item.get("text") or item.get("content", "")
tool_call_tokens += _tok_len(text_val, enc)
elif isinstance(item, dict) and "content" in item:
# Other content types with content field
tool_call_tokens += _tok_len(item.get("content", ""), enc)
@@ -149,16 +145,10 @@ def _truncate_middle_tokens(text: str, enc, max_tok: int) -> str:
if len(ids) <= max_tok:
return text # nothing to do
# Need at least 3 tokens (head + ellipsis + tail) for meaningful truncation
if max_tok < 1:
return ""
mid = enc.encode("")
if max_tok < 3:
return enc.decode(ids[:max_tok])
# Split the allowance between the two ends:
head = max_tok // 2 - 1 # -1 for the ellipsis
tail = max_tok - head - 1
mid = enc.encode("")
return enc.decode(ids[:head] + mid + ids[-tail:])
@@ -555,14 +545,6 @@ async def _summarize_messages_llm(
"- Actions taken and key decisions made\n"
"- Technical specifics (file names, tool outputs, function signatures)\n"
"- Errors encountered and resolutions applied\n\n"
"IMPORTANT: Preserve all concrete references verbatim — these are small but "
"critical for continuing the conversation:\n"
"- File paths and directory paths (e.g. /src/app/page.tsx, ./output/result.csv)\n"
"- Image/media file paths from tool outputs\n"
"- URLs, API endpoints, and webhook addresses\n"
"- Resource IDs, session IDs, and identifiers\n"
"- Tool names that were called and their key parameters\n"
"- Environment variables, config keys, and credentials names (not values)\n\n"
"Include ONLY the sections below that have relevant content "
"(skip sections with nothing to report):\n\n"
"## 1. Primary Request and Intent\n"
@@ -570,8 +552,7 @@ async def _summarize_messages_llm(
"## 2. Key Technical Concepts\n"
"Technologies, frameworks, tools, and patterns being used or discussed.\n\n"
"## 3. Files and Resources Involved\n"
"Specific files examined or modified, with relevant snippets and identifiers. "
"Include exact file paths, image paths from tool outputs, and resource URLs.\n\n"
"Specific files examined or modified, with relevant snippets and identifiers.\n\n"
"## 4. Errors and Fixes\n"
"Problems encountered, error messages, and their resolutions.\n\n"
"## 5. All User Messages\n"
@@ -585,7 +566,7 @@ async def _summarize_messages_llm(
},
{"role": "user", "content": f"Summarize:\n\n{conversation_text}"},
],
max_tokens=2000,
max_tokens=1500,
temperature=0.3,
)
@@ -705,15 +686,11 @@ async def compress_context(
msgs = [summary_msg] + recent_msgs
logger.info(
"Context summarized: %d -> %d tokens, summarized %d messages",
original_count,
total_tokens(),
messages_summarized,
f"Context summarized: {original_count} -> {total_tokens()} tokens, "
f"summarized {messages_summarized} messages"
)
except Exception as e:
logger.warning(
"Summarization failed, continuing with truncation: %s", e
)
logger.warning(f"Summarization failed, continuing with truncation: {e}")
# Fall through to content truncation
# ---- STEP 2: Normalize content ----------------------------------------

View File

@@ -2,7 +2,7 @@
import { ChatInput } from "@/app/(platform)/copilot/components/ChatInput/ChatInput";
import { UIDataTypes, UIMessage, UITools } from "ai";
import { LayoutGroup, motion } from "framer-motion";
import { ReactNode } from "react";
import { ReactNode, useCallback } from "react";
import { ChatMessagesContainer } from "../ChatMessagesContainer/ChatMessagesContainer";
import { CopilotChatActionsProvider } from "../CopilotChatActionsProvider/CopilotChatActionsProvider";
import { EmptySession } from "../EmptySession/EmptySession";
@@ -52,6 +52,20 @@ export const ChatContainer = ({
!!isSessionError;
const inputLayoutId = "copilot-2-chat-input";
// Retry: re-send the last user message (used by ErrorCard on transient errors)
const handleRetry = useCallback(() => {
const lastUserMsg = [...messages].reverse().find((m) => m.role === "user");
const lastText = lastUserMsg?.parts
.filter(
(p): p is Extract<typeof p, { type: "text" }> => p.type === "text",
)
.map((p) => p.text)
.join("");
if (lastText) {
onSend(lastText);
}
}, [messages, onSend]);
return (
<CopilotChatActionsProvider onSend={onSend}>
<LayoutGroup id="copilot-2-chat-layout">
@@ -65,6 +79,7 @@ export const ChatContainer = ({
isLoading={isLoadingSession}
headerSlot={headerSlot}
sessionID={sessionId}
onRetry={handleRetry}
/>
<motion.div
initial={{ opacity: 0 }}

View File

@@ -32,11 +32,13 @@ interface Props {
isLoading: boolean;
headerSlot?: React.ReactNode;
sessionID?: string | null;
onRetry?: () => void;
}
function renderSegments(
segments: RenderSegment[],
messageID: string,
onRetry?: () => void,
): React.ReactNode[] {
return segments.map((seg, segIdx) => {
if (seg.kind === "collapsed-group") {
@@ -48,6 +50,7 @@ function renderSegments(
part={seg.part}
messageID={messageID}
partIndex={seg.index}
onRetry={onRetry}
/>
);
});
@@ -104,6 +107,7 @@ export function ChatMessagesContainer({
isLoading,
headerSlot,
sessionID,
onRetry,
}: Props) {
const lastMessage = messages[messages.length - 1];
const graphExecId = useMemo(() => extractGraphExecId(messages), [messages]);
@@ -212,13 +216,18 @@ export function ChatMessagesContainer({
</ReasoningCollapse>
)}
{responseSegments
? renderSegments(responseSegments, message.id)
? renderSegments(
responseSegments,
message.id,
isLastAssistant ? onRetry : undefined,
)
: message.parts.map((part, i) => (
<MessagePartRenderer
key={`${message.id}-${i}`}
part={part}
messageID={message.id}
partIndex={i}
onRetry={isLastAssistant ? onRetry : undefined}
/>
))}
{isLastInTurn && !isCurrentlyStreaming && (

View File

@@ -69,9 +69,15 @@ interface Props {
part: UIMessage<unknown, UIDataTypes, UITools>["parts"][number];
messageID: string;
partIndex: number;
onRetry?: () => void;
}
export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
export function MessagePartRenderer({
part,
messageID,
partIndex,
onRetry,
}: Props) {
const key = `${messageID}-${partIndex}`;
switch (part.type) {
@@ -80,7 +86,7 @@ export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
part.text,
);
if (markerType === "error") {
if (markerType === "error" || markerType === "retryable_error") {
const lowerMarker = markerText.toLowerCase();
const isCancellation =
lowerMarker === "operation cancelled" ||
@@ -100,6 +106,7 @@ export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
key={key}
responseError={{ message: markerText }}
context="execution"
onRetry={markerType === "retryable_error" ? onRetry : undefined}
/>
);
}

View File

@@ -172,16 +172,22 @@ export function getTurnMessages(
// The hex suffix makes it virtually impossible for an LLM to accidentally
// produce these strings in normal conversation.
const COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]";
const COPILOT_RETRYABLE_ERROR_PREFIX = "[__COPILOT_RETRYABLE_ERROR_a9c2__]";
const COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]";
export type MarkerType = "error" | "system" | null;
export type MarkerType = "error" | "retryable_error" | "system" | null;
/** Escape all regex special characters in a string. */
function escapeRegExp(s: string): string {
return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
// Pre-compiled marker regexes (avoids re-creating on every call / render)
// Pre-compiled marker regexes (avoids re-creating on every call / render).
// Retryable check must come first since it's more specific.
const RETRYABLE_ERROR_MARKER_RE = new RegExp(
`${escapeRegExp(COPILOT_RETRYABLE_ERROR_PREFIX)}\\s*(.+?)$`,
"s",
);
const ERROR_MARKER_RE = new RegExp(
`${escapeRegExp(COPILOT_ERROR_PREFIX)}\\s*(.+?)$`,
"s",
@@ -196,6 +202,15 @@ export function parseSpecialMarkers(text: string): {
markerText: string;
cleanText: string;
} {
const retryableMatch = text.match(RETRYABLE_ERROR_MARKER_RE);
if (retryableMatch) {
return {
markerType: "retryable_error",
markerText: retryableMatch[1].trim(),
cleanText: text.replace(retryableMatch[0], "").trim(),
};
}
const errorMatch = text.match(ERROR_MARKER_RE);
if (errorMatch) {
return {