mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-30 01:18:07 -05:00
Compare commits
6 Commits
feat/sub-a
...
swiftyos/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb608ea60d | ||
|
|
46af3b94f2 | ||
|
|
083cceca0f | ||
|
|
06758adefd | ||
|
|
c01c29a059 | ||
|
|
d738059da8 |
@@ -29,7 +29,8 @@
|
||||
"postCreateCmd": [
|
||||
"cd autogpt_platform/autogpt_libs && poetry install",
|
||||
"cd autogpt_platform/backend && poetry install && poetry run prisma generate",
|
||||
"cd autogpt_platform/frontend && pnpm install"
|
||||
"cd autogpt_platform/frontend && pnpm install",
|
||||
"cd docs && pip install -r requirements.txt"
|
||||
],
|
||||
"terminalCommand": "code .",
|
||||
"deleteBranchWithWorktree": false
|
||||
|
||||
6
.github/copilot-instructions.md
vendored
6
.github/copilot-instructions.md
vendored
@@ -160,7 +160,7 @@ pnpm storybook # Start component development server
|
||||
|
||||
**Backend Entry Points:**
|
||||
|
||||
- `backend/backend/api/rest_api.py` - FastAPI application setup
|
||||
- `backend/backend/server/server.py` - FastAPI application setup
|
||||
- `backend/backend/data/` - Database models and user management
|
||||
- `backend/blocks/` - Agent execution blocks and logic
|
||||
|
||||
@@ -219,7 +219,7 @@ Agents are built using a visual block-based system where each block performs a s
|
||||
|
||||
### API Development
|
||||
|
||||
1. Update routes in `/backend/backend/api/features/`
|
||||
1. Update routes in `/backend/backend/server/routers/`
|
||||
2. Add/update Pydantic models in same directory
|
||||
3. Write tests alongside route files
|
||||
4. For `data/*.py` changes, validate user ID checks
|
||||
@@ -285,7 +285,7 @@ Agents are built using a visual block-based system where each block performs a s
|
||||
|
||||
### Security Guidelines
|
||||
|
||||
**Cache Protection Middleware** (`/backend/backend/api/middleware/security.py`):
|
||||
**Cache Protection Middleware** (`/backend/backend/server/middleware/security.py`):
|
||||
|
||||
- Default: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
|
||||
- Uses allow list approach for cacheable paths (static assets, health checks, public pages)
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -178,5 +178,4 @@ autogpt_platform/backend/settings.py
|
||||
*.ign.*
|
||||
.test-contents
|
||||
.claude/settings.local.json
|
||||
CLAUDE.local.md
|
||||
/autogpt_platform/backend/logs
|
||||
|
||||
@@ -6,30 +6,141 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
|
||||
|
||||
AutoGPT Platform is a monorepo containing:
|
||||
|
||||
- **Backend** (`backend`): Python FastAPI server with async support
|
||||
- **Frontend** (`frontend`): Next.js React application
|
||||
- **Shared Libraries** (`autogpt_libs`): Common Python utilities
|
||||
- **Backend** (`/backend`): Python FastAPI server with async support
|
||||
- **Frontend** (`/frontend`): Next.js React application
|
||||
- **Shared Libraries** (`/autogpt_libs`): Common Python utilities
|
||||
|
||||
## Component Documentation
|
||||
## Essential Commands
|
||||
|
||||
- **Backend**: See @backend/CLAUDE.md for backend-specific commands, architecture, and development tasks
|
||||
- **Frontend**: See @frontend/CLAUDE.md for frontend-specific commands, architecture, and development patterns
|
||||
### Backend Development
|
||||
|
||||
## Key Concepts
|
||||
```bash
|
||||
# Install dependencies
|
||||
cd backend && poetry install
|
||||
|
||||
# Run database migrations
|
||||
poetry run prisma migrate dev
|
||||
|
||||
# Start all services (database, redis, rabbitmq, clamav)
|
||||
docker compose up -d
|
||||
|
||||
# Run the backend server
|
||||
poetry run serve
|
||||
|
||||
# Run tests
|
||||
poetry run test
|
||||
|
||||
# Run specific test
|
||||
poetry run pytest path/to/test_file.py::test_function_name
|
||||
|
||||
# Run block tests (tests that validate all blocks work correctly)
|
||||
poetry run pytest backend/blocks/test/test_block.py -xvs
|
||||
|
||||
# Run tests for a specific block (e.g., GetCurrentTimeBlock)
|
||||
poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[GetCurrentTimeBlock]' -xvs
|
||||
|
||||
# Lint and format
|
||||
# prefer format if you want to just "fix" it and only get the errors that can't be autofixed
|
||||
poetry run format # Black + isort
|
||||
poetry run lint # ruff
|
||||
```
|
||||
|
||||
More details can be found in TESTING.md
|
||||
|
||||
#### Creating/Updating Snapshots
|
||||
|
||||
When you first write a test or when the expected output changes:
|
||||
|
||||
```bash
|
||||
poetry run pytest path/to/test.py --snapshot-update
|
||||
```
|
||||
|
||||
⚠️ **Important**: Always review snapshot changes before committing! Use `git diff` to verify the changes are expected.
|
||||
|
||||
### Frontend Development
|
||||
|
||||
```bash
|
||||
# Install dependencies
|
||||
cd frontend && pnpm i
|
||||
|
||||
# Generate API client from OpenAPI spec
|
||||
pnpm generate:api
|
||||
|
||||
# Start development server
|
||||
pnpm dev
|
||||
|
||||
# Run E2E tests
|
||||
pnpm test
|
||||
|
||||
# Run Storybook for component development
|
||||
pnpm storybook
|
||||
|
||||
# Build production
|
||||
pnpm build
|
||||
|
||||
# Format and lint
|
||||
pnpm format
|
||||
|
||||
# Type checking
|
||||
pnpm types
|
||||
```
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
### Backend Architecture
|
||||
|
||||
- **API Layer**: FastAPI with REST and WebSocket endpoints
|
||||
- **Database**: PostgreSQL with Prisma ORM, includes pgvector for embeddings
|
||||
- **Queue System**: RabbitMQ for async task processing
|
||||
- **Execution Engine**: Separate executor service processes agent workflows
|
||||
- **Authentication**: JWT-based with Supabase integration
|
||||
- **Security**: Cache protection middleware prevents sensitive data caching in browsers/proxies
|
||||
|
||||
### Frontend Architecture
|
||||
|
||||
- **Framework**: Next.js 15 App Router (client-first approach)
|
||||
- **Data Fetching**: Type-safe generated API hooks via Orval + React Query
|
||||
- **State Management**: React Query for server state, co-located UI state in components/hooks
|
||||
- **Component Structure**: Separate render logic (`.tsx`) from business logic (`use*.ts` hooks)
|
||||
- **Workflow Builder**: Visual graph editor using @xyflow/react
|
||||
- **UI Components**: shadcn/ui (Radix UI primitives) with Tailwind CSS styling
|
||||
- **Icons**: Phosphor Icons only
|
||||
- **Feature Flags**: LaunchDarkly integration
|
||||
- **Error Handling**: ErrorCard for render errors, toast for mutations, Sentry for exceptions
|
||||
- **Testing**: Playwright for E2E, Storybook for component development
|
||||
|
||||
### Key Concepts
|
||||
|
||||
1. **Agent Graphs**: Workflow definitions stored as JSON, executed by the backend
|
||||
2. **Blocks**: Reusable components in `backend/backend/blocks/` that perform specific tasks
|
||||
2. **Blocks**: Reusable components in `/backend/blocks/` that perform specific tasks
|
||||
3. **Integrations**: OAuth and API connections stored per user
|
||||
4. **Store**: Marketplace for sharing agent templates
|
||||
5. **Virus Scanning**: ClamAV integration for file upload security
|
||||
|
||||
### Testing Approach
|
||||
|
||||
- Backend uses pytest with snapshot testing for API responses
|
||||
- Test files are colocated with source files (`*_test.py`)
|
||||
- Frontend uses Playwright for E2E tests
|
||||
- Component testing via Storybook
|
||||
|
||||
### Database Schema
|
||||
|
||||
Key models (defined in `/backend/schema.prisma`):
|
||||
|
||||
- `User`: Authentication and profile data
|
||||
- `AgentGraph`: Workflow definitions with version control
|
||||
- `AgentGraphExecution`: Execution history and results
|
||||
- `AgentNode`: Individual nodes in a workflow
|
||||
- `StoreListing`: Marketplace listings for sharing agents
|
||||
|
||||
### Environment Configuration
|
||||
|
||||
#### Configuration Files
|
||||
|
||||
- **Backend**: `backend/.env.default` (defaults) → `backend/.env` (user overrides)
|
||||
- **Frontend**: `frontend/.env.default` (defaults) → `frontend/.env` (user overrides)
|
||||
- **Platform**: `.env.default` (Supabase/shared defaults) → `.env` (user overrides)
|
||||
- **Backend**: `/backend/.env.default` (defaults) → `/backend/.env` (user overrides)
|
||||
- **Frontend**: `/frontend/.env.default` (defaults) → `/frontend/.env` (user overrides)
|
||||
- **Platform**: `/.env.default` (Supabase/shared defaults) → `/.env` (user overrides)
|
||||
|
||||
#### Docker Environment Loading Order
|
||||
|
||||
@@ -45,12 +156,130 @@ AutoGPT Platform is a monorepo containing:
|
||||
- Backend/Frontend services use YAML anchors for consistent configuration
|
||||
- Supabase services (`db/docker/docker-compose.yml`) follow the same pattern
|
||||
|
||||
### Common Development Tasks
|
||||
|
||||
**Adding a new block:**
|
||||
|
||||
Follow the comprehensive [Block SDK Guide](../../../docs/content/platform/block-sdk-guide.md) which covers:
|
||||
|
||||
- Provider configuration with `ProviderBuilder`
|
||||
- Block schema definition
|
||||
- Authentication (API keys, OAuth, webhooks)
|
||||
- Testing and validation
|
||||
- File organization
|
||||
|
||||
Quick steps:
|
||||
|
||||
1. Create new file in `/backend/backend/blocks/`
|
||||
2. Configure provider using `ProviderBuilder` in `_config.py`
|
||||
3. Inherit from `Block` base class
|
||||
4. Define input/output schemas using `BlockSchema`
|
||||
5. Implement async `run` method
|
||||
6. Generate unique block ID using `uuid.uuid4()`
|
||||
7. Test with `poetry run pytest backend/blocks/test/test_block.py`
|
||||
|
||||
Note: when making many new blocks analyze the interfaces for each of these blocks and picture if they would go well together in a graph based editor or would they struggle to connect productively?
|
||||
ex: do the inputs and outputs tie well together?
|
||||
|
||||
If you get any pushback or hit complex block conditions check the new_blocks guide in the docs.
|
||||
|
||||
**Handling files in blocks with `store_media_file()`:**
|
||||
|
||||
When blocks need to work with files (images, videos, documents), use `store_media_file()` from `backend.util.file`. The `return_format` parameter determines what you get back:
|
||||
|
||||
| Format | Use When | Returns |
|
||||
|--------|----------|---------|
|
||||
| `"for_local_processing"` | Processing with local tools (ffmpeg, MoviePy, PIL) | Local file path (e.g., `"image.png"`) |
|
||||
| `"for_external_api"` | Sending content to external APIs (Replicate, OpenAI) | Data URI (e.g., `"data:image/png;base64,..."`) |
|
||||
| `"for_block_output"` | Returning output from your block | Smart: `workspace://` in CoPilot, data URI in graphs |
|
||||
|
||||
**Examples:**
|
||||
```python
|
||||
# INPUT: Need to process file locally with ffmpeg
|
||||
local_path = await store_media_file(
|
||||
file=input_data.video,
|
||||
execution_context=execution_context,
|
||||
return_format="for_local_processing",
|
||||
)
|
||||
# local_path = "video.mp4" - use with Path/ffmpeg/etc
|
||||
|
||||
# INPUT: Need to send to external API like Replicate
|
||||
image_b64 = await store_media_file(
|
||||
file=input_data.image,
|
||||
execution_context=execution_context,
|
||||
return_format="for_external_api",
|
||||
)
|
||||
# image_b64 = "data:image/png;base64,iVBORw0..." - send to API
|
||||
|
||||
# OUTPUT: Returning result from block
|
||||
result_url = await store_media_file(
|
||||
file=generated_image_url,
|
||||
execution_context=execution_context,
|
||||
return_format="for_block_output",
|
||||
)
|
||||
yield "image_url", result_url
|
||||
# In CoPilot: result_url = "workspace://abc123"
|
||||
# In graphs: result_url = "data:image/png;base64,..."
|
||||
```
|
||||
|
||||
**Key points:**
|
||||
- `for_block_output` is the ONLY format that auto-adapts to execution context
|
||||
- Always use `for_block_output` for block outputs unless you have a specific reason not to
|
||||
- Never hardcode workspace checks - let `for_block_output` handle it
|
||||
|
||||
**Modifying the API:**
|
||||
|
||||
1. Update route in `/backend/backend/server/routers/`
|
||||
2. Add/update Pydantic models in same directory
|
||||
3. Write tests alongside the route file
|
||||
4. Run `poetry run test` to verify
|
||||
|
||||
### Frontend guidelines:
|
||||
|
||||
See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
|
||||
|
||||
1. **Pages**: Create in `src/app/(platform)/feature-name/page.tsx`
|
||||
- Add `usePageName.ts` hook for logic
|
||||
- Put sub-components in local `components/` folder
|
||||
2. **Components**: Structure as `ComponentName/ComponentName.tsx` + `useComponentName.ts` + `helpers.ts`
|
||||
- Use design system components from `src/components/` (atoms, molecules, organisms)
|
||||
- Never use `src/components/__legacy__/*`
|
||||
3. **Data fetching**: Use generated API hooks from `@/app/api/__generated__/endpoints/`
|
||||
- Regenerate with `pnpm generate:api`
|
||||
- Pattern: `use{Method}{Version}{OperationName}`
|
||||
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
|
||||
5. **Testing**: Add Storybook stories for new components, Playwright for E2E
|
||||
6. **Code conventions**: Function declarations (not arrow functions) for components/handlers
|
||||
|
||||
- Component props should be `interface Props { ... }` (not exported) unless the interface needs to be used outside the component
|
||||
- Separate render logic from business logic (component.tsx + useComponent.ts + helpers.ts)
|
||||
- Colocate state when possible and avoid creating large components, use sub-components ( local `/components` folder next to the parent component ) when sensible
|
||||
- Avoid large hooks, abstract logic into `helpers.ts` files when sensible
|
||||
- Use function declarations for components, arrow functions only for callbacks
|
||||
- No barrel files or `index.ts` re-exports
|
||||
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
|
||||
- Avoid comments at all times unless the code is very complex
|
||||
- Do not type hook returns, let Typescript infer as much as possible
|
||||
- Never type with `any`, if not types available use `unknown`
|
||||
|
||||
### Security Implementation
|
||||
|
||||
**Cache Protection Middleware:**
|
||||
|
||||
- Located in `/backend/backend/server/middleware/security.py`
|
||||
- Default behavior: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
|
||||
- Uses an allow list approach - only explicitly permitted paths can be cached
|
||||
- Cacheable paths include: static assets (`/static/*`, `/_next/static/*`), health checks, public store pages, documentation
|
||||
- Prevents sensitive data (auth tokens, API keys, user data) from being cached by browsers/proxies
|
||||
- To allow caching for a new endpoint, add it to `CACHEABLE_PATHS` in the middleware
|
||||
- Applied to both main API server and external API applications
|
||||
|
||||
### Creating Pull Requests
|
||||
|
||||
- Create the PR against the `dev` branch of the repository.
|
||||
- Ensure the branch name is descriptive (e.g., `feature/add-new-block`)
|
||||
- Use conventional commit messages (see below)
|
||||
- Fill out the .github/PULL_REQUEST_TEMPLATE.md template as the PR description
|
||||
- Create the PR aginst the `dev` branch of the repository.
|
||||
- Ensure the branch name is descriptive (e.g., `feature/add-new-block`)/
|
||||
- Use conventional commit messages (see below)/
|
||||
- Fill out the .github/PULL_REQUEST_TEMPLATE.md template as the PR description/
|
||||
- Run the github pre-commit hooks to ensure code quality.
|
||||
|
||||
### Reviewing/Revising Pull Requests
|
||||
|
||||
@@ -1,170 +0,0 @@
|
||||
# CLAUDE.md - Backend
|
||||
|
||||
This file provides guidance to Claude Code when working with the backend.
|
||||
|
||||
## Essential Commands
|
||||
|
||||
To run something with Python package dependencies you MUST use `poetry run ...`.
|
||||
|
||||
```bash
|
||||
# Install dependencies
|
||||
poetry install
|
||||
|
||||
# Run database migrations
|
||||
poetry run prisma migrate dev
|
||||
|
||||
# Start all services (database, redis, rabbitmq, clamav)
|
||||
docker compose up -d
|
||||
|
||||
# Run the backend as a whole
|
||||
poetry run app
|
||||
|
||||
# Run tests
|
||||
poetry run test
|
||||
|
||||
# Run specific test
|
||||
poetry run pytest path/to/test_file.py::test_function_name
|
||||
|
||||
# Run block tests (tests that validate all blocks work correctly)
|
||||
poetry run pytest backend/blocks/test/test_block.py -xvs
|
||||
|
||||
# Run tests for a specific block (e.g., GetCurrentTimeBlock)
|
||||
poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[GetCurrentTimeBlock]' -xvs
|
||||
|
||||
# Lint and format
|
||||
# prefer format if you want to just "fix" it and only get the errors that can't be autofixed
|
||||
poetry run format # Black + isort
|
||||
poetry run lint # ruff
|
||||
```
|
||||
|
||||
More details can be found in @TESTING.md
|
||||
|
||||
### Creating/Updating Snapshots
|
||||
|
||||
When you first write a test or when the expected output changes:
|
||||
|
||||
```bash
|
||||
poetry run pytest path/to/test.py --snapshot-update
|
||||
```
|
||||
|
||||
⚠️ **Important**: Always review snapshot changes before committing! Use `git diff` to verify the changes are expected.
|
||||
|
||||
## Architecture
|
||||
|
||||
- **API Layer**: FastAPI with REST and WebSocket endpoints
|
||||
- **Database**: PostgreSQL with Prisma ORM, includes pgvector for embeddings
|
||||
- **Queue System**: RabbitMQ for async task processing
|
||||
- **Execution Engine**: Separate executor service processes agent workflows
|
||||
- **Authentication**: JWT-based with Supabase integration
|
||||
- **Security**: Cache protection middleware prevents sensitive data caching in browsers/proxies
|
||||
|
||||
## Testing Approach
|
||||
|
||||
- Uses pytest with snapshot testing for API responses
|
||||
- Test files are colocated with source files (`*_test.py`)
|
||||
|
||||
## Database Schema
|
||||
|
||||
Key models (defined in `schema.prisma`):
|
||||
|
||||
- `User`: Authentication and profile data
|
||||
- `AgentGraph`: Workflow definitions with version control
|
||||
- `AgentGraphExecution`: Execution history and results
|
||||
- `AgentNode`: Individual nodes in a workflow
|
||||
- `StoreListing`: Marketplace listings for sharing agents
|
||||
|
||||
## Environment Configuration
|
||||
|
||||
- **Backend**: `.env.default` (defaults) → `.env` (user overrides)
|
||||
|
||||
## Common Development Tasks
|
||||
|
||||
### Adding a new block
|
||||
|
||||
Follow the comprehensive [Block SDK Guide](@../../docs/content/platform/block-sdk-guide.md) which covers:
|
||||
|
||||
- Provider configuration with `ProviderBuilder`
|
||||
- Block schema definition
|
||||
- Authentication (API keys, OAuth, webhooks)
|
||||
- Testing and validation
|
||||
- File organization
|
||||
|
||||
Quick steps:
|
||||
|
||||
1. Create new file in `backend/blocks/`
|
||||
2. Configure provider using `ProviderBuilder` in `_config.py`
|
||||
3. Inherit from `Block` base class
|
||||
4. Define input/output schemas using `BlockSchema`
|
||||
5. Implement async `run` method
|
||||
6. Generate unique block ID using `uuid.uuid4()`
|
||||
7. Test with `poetry run pytest backend/blocks/test/test_block.py`
|
||||
|
||||
Note: when making many new blocks analyze the interfaces for each of these blocks and picture if they would go well together in a graph-based editor or would they struggle to connect productively?
|
||||
ex: do the inputs and outputs tie well together?
|
||||
|
||||
If you get any pushback or hit complex block conditions check the new_blocks guide in the docs.
|
||||
|
||||
#### Handling files in blocks with `store_media_file()`
|
||||
|
||||
When blocks need to work with files (images, videos, documents), use `store_media_file()` from `backend.util.file`. The `return_format` parameter determines what you get back:
|
||||
|
||||
| Format | Use When | Returns |
|
||||
|--------|----------|---------|
|
||||
| `"for_local_processing"` | Processing with local tools (ffmpeg, MoviePy, PIL) | Local file path (e.g., `"image.png"`) |
|
||||
| `"for_external_api"` | Sending content to external APIs (Replicate, OpenAI) | Data URI (e.g., `"data:image/png;base64,..."`) |
|
||||
| `"for_block_output"` | Returning output from your block | Smart: `workspace://` in CoPilot, data URI in graphs |
|
||||
|
||||
**Examples:**
|
||||
|
||||
```python
|
||||
# INPUT: Need to process file locally with ffmpeg
|
||||
local_path = await store_media_file(
|
||||
file=input_data.video,
|
||||
execution_context=execution_context,
|
||||
return_format="for_local_processing",
|
||||
)
|
||||
# local_path = "video.mp4" - use with Path/ffmpeg/etc
|
||||
|
||||
# INPUT: Need to send to external API like Replicate
|
||||
image_b64 = await store_media_file(
|
||||
file=input_data.image,
|
||||
execution_context=execution_context,
|
||||
return_format="for_external_api",
|
||||
)
|
||||
# image_b64 = "data:image/png;base64,iVBORw0..." - send to API
|
||||
|
||||
# OUTPUT: Returning result from block
|
||||
result_url = await store_media_file(
|
||||
file=generated_image_url,
|
||||
execution_context=execution_context,
|
||||
return_format="for_block_output",
|
||||
)
|
||||
yield "image_url", result_url
|
||||
# In CoPilot: result_url = "workspace://abc123"
|
||||
# In graphs: result_url = "data:image/png;base64,..."
|
||||
```
|
||||
|
||||
**Key points:**
|
||||
|
||||
- `for_block_output` is the ONLY format that auto-adapts to execution context
|
||||
- Always use `for_block_output` for block outputs unless you have a specific reason not to
|
||||
- Never hardcode workspace checks - let `for_block_output` handle it
|
||||
|
||||
### Modifying the API
|
||||
|
||||
1. Update route in `backend/api/features/`
|
||||
2. Add/update Pydantic models in same directory
|
||||
3. Write tests alongside the route file
|
||||
4. Run `poetry run test` to verify
|
||||
|
||||
## Security Implementation
|
||||
|
||||
### Cache Protection Middleware
|
||||
|
||||
- Located in `backend/api/middleware/security.py`
|
||||
- Default behavior: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
|
||||
- Uses an allow list approach - only explicitly permitted paths can be cached
|
||||
- Cacheable paths include: static assets (`static/*`, `_next/static/*`), health checks, public store pages, documentation
|
||||
- Prevents sensitive data (auth tokens, API keys, user data) from being cached by browsers/proxies
|
||||
- To allow caching for a new endpoint, add it to `CACHEABLE_PATHS` in the middleware
|
||||
- Applied to both main API server and external API applications
|
||||
@@ -138,7 +138,7 @@ If the test doesn't need the `user_id` specifically, mocking is not necessary as
|
||||
|
||||
#### Using Global Auth Fixtures
|
||||
|
||||
Two global auth fixtures are provided by `backend/api/conftest.py`:
|
||||
Two global auth fixtures are provided by `backend/server/conftest.py`:
|
||||
|
||||
- `mock_jwt_user` - Regular user with `test_user_id` ("test-user-id")
|
||||
- `mock_jwt_admin` - Admin user with `admin_user_id` ("admin-user-id")
|
||||
|
||||
@@ -17,7 +17,7 @@ router = fastapi.APIRouter(
|
||||
)
|
||||
|
||||
|
||||
# Taken from backend/api/features/store/db.py
|
||||
# Taken from backend/server/v2/store/db.py
|
||||
def sanitize_query(query: str | None) -> str | None:
|
||||
if query is None:
|
||||
return query
|
||||
|
||||
@@ -0,0 +1,325 @@
|
||||
"""RabbitMQ consumer for operation completion messages.
|
||||
|
||||
This module provides a consumer that listens for completion notifications
|
||||
from external services (like Agent Generator) and triggers the appropriate
|
||||
stream registry and chat service updates.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import orjson
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.rabbitmq import (
|
||||
AsyncRabbitMQ,
|
||||
Exchange,
|
||||
ExchangeType,
|
||||
Queue,
|
||||
RabbitMQConfig,
|
||||
)
|
||||
|
||||
from . import service as chat_service
|
||||
from . import stream_registry
|
||||
from .response_model import StreamError, StreamFinish, StreamToolOutputAvailable
|
||||
from .tools.models import ErrorResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Queue and exchange configuration
|
||||
OPERATION_COMPLETE_EXCHANGE = Exchange(
|
||||
name="chat_operations",
|
||||
type=ExchangeType.DIRECT,
|
||||
durable=True,
|
||||
)
|
||||
|
||||
OPERATION_COMPLETE_QUEUE = Queue(
|
||||
name="chat_operation_complete",
|
||||
durable=True,
|
||||
exchange=OPERATION_COMPLETE_EXCHANGE,
|
||||
routing_key="operation.complete",
|
||||
)
|
||||
|
||||
RABBITMQ_CONFIG = RabbitMQConfig(
|
||||
exchanges=[OPERATION_COMPLETE_EXCHANGE],
|
||||
queues=[OPERATION_COMPLETE_QUEUE],
|
||||
)
|
||||
|
||||
|
||||
class OperationCompleteMessage(BaseModel):
|
||||
"""Message format for operation completion notifications."""
|
||||
|
||||
operation_id: str
|
||||
task_id: str
|
||||
success: bool
|
||||
result: dict | str | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class ChatCompletionConsumer:
|
||||
"""Consumer for chat operation completion messages from RabbitMQ."""
|
||||
|
||||
def __init__(self):
|
||||
self._rabbitmq: AsyncRabbitMQ | None = None
|
||||
self._consumer_task: asyncio.Task | None = None
|
||||
self._running = False
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the completion consumer."""
|
||||
if self._running:
|
||||
logger.warning("Completion consumer already running")
|
||||
return
|
||||
|
||||
self._rabbitmq = AsyncRabbitMQ(RABBITMQ_CONFIG)
|
||||
await self._rabbitmq.connect()
|
||||
|
||||
self._running = True
|
||||
self._consumer_task = asyncio.create_task(self._consume_messages())
|
||||
logger.info("Chat completion consumer started")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the completion consumer."""
|
||||
self._running = False
|
||||
|
||||
if self._consumer_task:
|
||||
self._consumer_task.cancel()
|
||||
try:
|
||||
await self._consumer_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._consumer_task = None
|
||||
|
||||
if self._rabbitmq:
|
||||
await self._rabbitmq.disconnect()
|
||||
self._rabbitmq = None
|
||||
|
||||
logger.info("Chat completion consumer stopped")
|
||||
|
||||
async def _consume_messages(self) -> None:
|
||||
"""Main message consumption loop with retry logic."""
|
||||
max_retries = 10
|
||||
retry_delay = 5 # seconds
|
||||
retry_count = 0
|
||||
|
||||
while self._running and retry_count < max_retries:
|
||||
if not self._rabbitmq:
|
||||
logger.error("RabbitMQ not initialized")
|
||||
return
|
||||
|
||||
try:
|
||||
channel = await self._rabbitmq.get_channel()
|
||||
queue = await channel.get_queue(OPERATION_COMPLETE_QUEUE.name)
|
||||
|
||||
# Reset retry count on successful connection
|
||||
retry_count = 0
|
||||
|
||||
async with queue.iterator() as queue_iter:
|
||||
async for message in queue_iter:
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
try:
|
||||
async with message.process():
|
||||
await self._handle_message(message.body)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing completion message: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Message will be requeued due to exception
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Consumer cancelled")
|
||||
return
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
logger.error(
|
||||
f"Consumer error (retry {retry_count}/{max_retries}): {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
if self._running and retry_count < max_retries:
|
||||
await asyncio.sleep(retry_delay)
|
||||
else:
|
||||
logger.error("Max retries reached, stopping consumer")
|
||||
return
|
||||
|
||||
async def _handle_message(self, body: bytes) -> None:
|
||||
"""Handle a single completion message."""
|
||||
try:
|
||||
data = orjson.loads(body)
|
||||
message = OperationCompleteMessage(**data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse completion message: {e}")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Received completion for operation {message.operation_id} "
|
||||
f"(task_id={message.task_id}, success={message.success})"
|
||||
)
|
||||
|
||||
# Find task in registry
|
||||
task = await stream_registry.find_task_by_operation_id(message.operation_id)
|
||||
if task is None:
|
||||
# Try to look up by task_id directly
|
||||
task = await stream_registry.get_task(message.task_id)
|
||||
|
||||
if task is None:
|
||||
logger.warning(
|
||||
f"Task not found for operation {message.operation_id} "
|
||||
f"(task_id={message.task_id})"
|
||||
)
|
||||
return
|
||||
|
||||
if message.success:
|
||||
await self._handle_success(task, message)
|
||||
else:
|
||||
await self._handle_failure(task, message)
|
||||
|
||||
async def _handle_success(
|
||||
self,
|
||||
task: stream_registry.ActiveTask,
|
||||
message: OperationCompleteMessage,
|
||||
) -> None:
|
||||
"""Handle successful operation completion."""
|
||||
# Publish result to stream registry
|
||||
result_output = message.result if message.result else {"status": "completed"}
|
||||
await stream_registry.publish_chunk(
|
||||
task.task_id,
|
||||
StreamToolOutputAvailable(
|
||||
toolCallId=task.tool_call_id,
|
||||
toolName=task.tool_name,
|
||||
output=(
|
||||
result_output
|
||||
if isinstance(result_output, str)
|
||||
else orjson.dumps(result_output).decode("utf-8")
|
||||
),
|
||||
success=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Update pending operation in database
|
||||
result_str = (
|
||||
message.result
|
||||
if isinstance(message.result, str)
|
||||
else (
|
||||
orjson.dumps(message.result).decode("utf-8")
|
||||
if message.result
|
||||
else '{"status": "completed"}'
|
||||
)
|
||||
)
|
||||
await chat_service._update_pending_operation(
|
||||
session_id=task.session_id,
|
||||
tool_call_id=task.tool_call_id,
|
||||
result=result_str,
|
||||
)
|
||||
|
||||
# Generate LLM continuation with streaming
|
||||
await chat_service._generate_llm_continuation_with_streaming(
|
||||
session_id=task.session_id,
|
||||
user_id=task.user_id,
|
||||
task_id=task.task_id,
|
||||
)
|
||||
|
||||
# Mark task as completed and release Redis lock
|
||||
await stream_registry.mark_task_completed(task.task_id, status="completed")
|
||||
await chat_service._mark_operation_completed(task.tool_call_id)
|
||||
|
||||
logger.info(
|
||||
f"Successfully processed completion for task {task.task_id} "
|
||||
f"(operation {message.operation_id})"
|
||||
)
|
||||
|
||||
async def _handle_failure(
|
||||
self,
|
||||
task: stream_registry.ActiveTask,
|
||||
message: OperationCompleteMessage,
|
||||
) -> None:
|
||||
"""Handle failed operation completion."""
|
||||
error_msg = message.error or "Operation failed"
|
||||
|
||||
# Publish error to stream registry followed by finish event
|
||||
await stream_registry.publish_chunk(
|
||||
task.task_id,
|
||||
StreamError(errorText=error_msg),
|
||||
)
|
||||
await stream_registry.publish_chunk(task.task_id, StreamFinish())
|
||||
|
||||
# Update pending operation with error
|
||||
error_response = ErrorResponse(
|
||||
message=error_msg,
|
||||
error=message.error,
|
||||
)
|
||||
await chat_service._update_pending_operation(
|
||||
session_id=task.session_id,
|
||||
tool_call_id=task.tool_call_id,
|
||||
result=error_response.model_dump_json(),
|
||||
)
|
||||
|
||||
# Mark task as failed and release Redis lock
|
||||
await stream_registry.mark_task_completed(task.task_id, status="failed")
|
||||
await chat_service._mark_operation_completed(task.tool_call_id)
|
||||
|
||||
logger.info(
|
||||
f"Processed failure for task {task.task_id} "
|
||||
f"(operation {message.operation_id}): {error_msg}"
|
||||
)
|
||||
|
||||
|
||||
# Module-level consumer instance
|
||||
_consumer: ChatCompletionConsumer | None = None
|
||||
|
||||
|
||||
async def start_completion_consumer() -> None:
|
||||
"""Start the global completion consumer."""
|
||||
global _consumer
|
||||
if _consumer is None:
|
||||
_consumer = ChatCompletionConsumer()
|
||||
await _consumer.start()
|
||||
|
||||
|
||||
async def stop_completion_consumer() -> None:
|
||||
"""Stop the global completion consumer."""
|
||||
global _consumer
|
||||
if _consumer:
|
||||
await _consumer.stop()
|
||||
_consumer = None
|
||||
|
||||
|
||||
async def publish_operation_complete(
|
||||
operation_id: str,
|
||||
task_id: str,
|
||||
success: bool,
|
||||
result: dict | str | None = None,
|
||||
error: str | None = None,
|
||||
) -> None:
|
||||
"""Publish an operation completion message.
|
||||
|
||||
This is a helper function for testing or for services that want to
|
||||
publish completion messages directly.
|
||||
|
||||
Args:
|
||||
operation_id: The operation ID that completed.
|
||||
task_id: The task ID associated with the operation.
|
||||
success: Whether the operation succeeded.
|
||||
result: The result data (for success).
|
||||
error: The error message (for failure).
|
||||
"""
|
||||
message = OperationCompleteMessage(
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
success=success,
|
||||
result=result,
|
||||
error=error,
|
||||
)
|
||||
|
||||
rabbitmq = AsyncRabbitMQ(RABBITMQ_CONFIG)
|
||||
try:
|
||||
await rabbitmq.connect()
|
||||
await rabbitmq.publish_message(
|
||||
routing_key="operation.complete",
|
||||
message=message.model_dump_json(),
|
||||
exchange=OPERATION_COMPLETE_EXCHANGE,
|
||||
)
|
||||
logger.info(f"Published completion for operation {operation_id}")
|
||||
finally:
|
||||
await rabbitmq.disconnect()
|
||||
@@ -44,6 +44,20 @@ class ChatConfig(BaseSettings):
|
||||
description="TTL in seconds for long-running operation tracking in Redis (safety net if pod dies)",
|
||||
)
|
||||
|
||||
# Stream registry configuration for SSE reconnection
|
||||
stream_ttl: int = Field(
|
||||
default=3600,
|
||||
description="TTL in seconds for stream data in Redis (1 hour)",
|
||||
)
|
||||
stream_max_length: int = Field(
|
||||
default=1000,
|
||||
description="Maximum number of messages to store per stream",
|
||||
)
|
||||
internal_api_key: str | None = Field(
|
||||
default=None,
|
||||
description="API key for internal webhook callbacks (env: CHAT_INTERNAL_API_KEY)",
|
||||
)
|
||||
|
||||
# Langfuse Prompt Management Configuration
|
||||
# Note: Langfuse credentials are in Settings().secrets (settings.py)
|
||||
langfuse_prompt_name: str = Field(
|
||||
@@ -82,6 +96,14 @@ class ChatConfig(BaseSettings):
|
||||
v = "https://openrouter.ai/api/v1"
|
||||
return v
|
||||
|
||||
@field_validator("internal_api_key", mode="before")
|
||||
@classmethod
|
||||
def get_internal_api_key(cls, v):
|
||||
"""Get internal API key from environment if not provided."""
|
||||
if v is None:
|
||||
v = os.getenv("CHAT_INTERNAL_API_KEY")
|
||||
return v
|
||||
|
||||
# Prompt paths for different contexts
|
||||
PROMPT_PATHS: dict[str, str] = {
|
||||
"default": "prompts/chat_system.md",
|
||||
|
||||
@@ -4,16 +4,19 @@ import logging
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Annotated
|
||||
|
||||
import orjson
|
||||
from autogpt_libs import auth
|
||||
from fastapi import APIRouter, Depends, Query, Security
|
||||
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Security
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.util.exceptions import NotFoundError
|
||||
|
||||
from . import service as chat_service
|
||||
from . import stream_registry
|
||||
from .config import ChatConfig
|
||||
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
|
||||
from .response_model import StreamFinish, StreamHeartbeat
|
||||
|
||||
config = ChatConfig()
|
||||
|
||||
@@ -81,6 +84,14 @@ class ListSessionsResponse(BaseModel):
|
||||
total: int
|
||||
|
||||
|
||||
class OperationCompleteRequest(BaseModel):
|
||||
"""Request model for external completion webhook."""
|
||||
|
||||
success: bool
|
||||
result: dict | str | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
# ========== Routes ==========
|
||||
|
||||
|
||||
@@ -366,6 +377,267 @@ async def session_assign_user(
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
# ========== Task Streaming (SSE Reconnection) ==========
|
||||
|
||||
|
||||
@router.get(
|
||||
"/tasks/{task_id}/stream",
|
||||
)
|
||||
async def stream_task(
|
||||
task_id: str,
|
||||
user_id: str | None = Depends(auth.get_user_id),
|
||||
last_message_id: str = Query(
|
||||
default="0-0",
|
||||
description="Last Redis Stream message ID received (e.g., '1706540123456-0'). Use '0-0' for full replay.",
|
||||
),
|
||||
):
|
||||
"""
|
||||
Reconnect to a long-running task's SSE stream.
|
||||
|
||||
When a long-running operation (like agent generation) starts, the client
|
||||
receives a task_id. If the connection drops, the client can reconnect
|
||||
using this endpoint to resume receiving updates.
|
||||
|
||||
Args:
|
||||
task_id: The task ID from the operation_started response.
|
||||
user_id: Authenticated user ID for ownership validation.
|
||||
last_message_id: Last Redis Stream message ID received ("0-0" for full replay).
|
||||
|
||||
Returns:
|
||||
StreamingResponse: SSE-formatted response chunks starting after last_message_id.
|
||||
|
||||
Raises:
|
||||
NotFoundError: If task_id is not found or user doesn't have access.
|
||||
"""
|
||||
# Get subscriber queue from stream registry
|
||||
subscriber_queue = await stream_registry.subscribe_to_task(
|
||||
task_id=task_id,
|
||||
user_id=user_id,
|
||||
last_message_id=last_message_id,
|
||||
)
|
||||
|
||||
if subscriber_queue is None:
|
||||
raise NotFoundError(f"Task {task_id} not found or access denied.")
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
import asyncio
|
||||
|
||||
chunk_count = 0
|
||||
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
# Wait for next chunk with timeout for heartbeats
|
||||
chunk = await asyncio.wait_for(
|
||||
subscriber_queue.get(), timeout=heartbeat_interval
|
||||
)
|
||||
chunk_count += 1
|
||||
yield chunk.to_sse()
|
||||
|
||||
# Check for finish signal
|
||||
if isinstance(chunk, StreamFinish):
|
||||
logger.info(
|
||||
f"Task stream completed for task {task_id}, "
|
||||
f"chunk_count={chunk_count}"
|
||||
)
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
# Send heartbeat to keep connection alive
|
||||
yield StreamHeartbeat().to_sse()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in task stream {task_id}: {e}", exc_info=True)
|
||||
finally:
|
||||
# Unsubscribe when client disconnects or stream ends
|
||||
await stream_registry.unsubscribe_from_task(task_id, subscriber_queue)
|
||||
|
||||
# AI SDK protocol termination
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
"x-vercel-ai-ui-message-stream": "v1",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/tasks/{task_id}",
|
||||
)
|
||||
async def get_task_status(
|
||||
task_id: str,
|
||||
user_id: str | None = Depends(auth.get_user_id),
|
||||
) -> dict:
|
||||
"""
|
||||
Get the status of a long-running task.
|
||||
|
||||
Args:
|
||||
task_id: The task ID to check.
|
||||
user_id: Authenticated user ID for ownership validation.
|
||||
|
||||
Returns:
|
||||
dict: Task status including task_id, status, tool_name, and operation_id.
|
||||
|
||||
Raises:
|
||||
NotFoundError: If task_id is not found or user doesn't have access.
|
||||
"""
|
||||
task = await stream_registry.get_task(task_id)
|
||||
|
||||
if task is None:
|
||||
raise NotFoundError(f"Task {task_id} not found.")
|
||||
|
||||
# Validate ownership
|
||||
if user_id and task.user_id and task.user_id != user_id:
|
||||
raise NotFoundError(f"Task {task_id} not found.")
|
||||
|
||||
return {
|
||||
"task_id": task.task_id,
|
||||
"session_id": task.session_id,
|
||||
"status": task.status,
|
||||
"tool_name": task.tool_name,
|
||||
"operation_id": task.operation_id,
|
||||
"created_at": task.created_at.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# ========== External Completion Webhook ==========
|
||||
|
||||
|
||||
@router.post(
|
||||
"/operations/{operation_id}/complete",
|
||||
status_code=200,
|
||||
)
|
||||
async def complete_operation(
|
||||
operation_id: str,
|
||||
request: OperationCompleteRequest,
|
||||
x_api_key: str | None = Header(default=None),
|
||||
) -> dict:
|
||||
"""
|
||||
External completion webhook for long-running operations.
|
||||
|
||||
Called by Agent Generator (or other services) when an operation completes.
|
||||
This triggers the stream registry to publish completion and continue LLM generation.
|
||||
|
||||
Args:
|
||||
operation_id: The operation ID to complete.
|
||||
request: Completion payload with success status and result/error.
|
||||
x_api_key: Internal API key for authentication.
|
||||
|
||||
Returns:
|
||||
dict: Status of the completion.
|
||||
|
||||
Raises:
|
||||
HTTPException: If API key is invalid or operation not found.
|
||||
"""
|
||||
# Validate internal API key - reject if not configured or invalid
|
||||
if not config.internal_api_key:
|
||||
logger.error(
|
||||
"Operation complete webhook rejected: CHAT_INTERNAL_API_KEY not configured"
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Webhook not available: internal API key not configured",
|
||||
)
|
||||
if x_api_key != config.internal_api_key:
|
||||
raise HTTPException(status_code=401, detail="Invalid API key")
|
||||
|
||||
# Find task by operation_id
|
||||
task = await stream_registry.find_task_by_operation_id(operation_id)
|
||||
if task is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Operation {operation_id} not found",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Received completion webhook for operation {operation_id} "
|
||||
f"(task_id={task.task_id}, success={request.success})"
|
||||
)
|
||||
|
||||
if request.success:
|
||||
# Publish result to stream registry
|
||||
from .response_model import StreamToolOutputAvailable
|
||||
|
||||
result_output = request.result if request.result else {"status": "completed"}
|
||||
await stream_registry.publish_chunk(
|
||||
task.task_id,
|
||||
StreamToolOutputAvailable(
|
||||
toolCallId=task.tool_call_id,
|
||||
toolName=task.tool_name,
|
||||
output=(
|
||||
result_output
|
||||
if isinstance(result_output, str)
|
||||
else orjson.dumps(result_output).decode("utf-8")
|
||||
),
|
||||
success=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Update pending operation in database
|
||||
from . import service as svc
|
||||
|
||||
result_str = (
|
||||
request.result
|
||||
if isinstance(request.result, str)
|
||||
else (
|
||||
orjson.dumps(request.result).decode("utf-8")
|
||||
if request.result
|
||||
else '{"status": "completed"}'
|
||||
)
|
||||
)
|
||||
await svc._update_pending_operation(
|
||||
session_id=task.session_id,
|
||||
tool_call_id=task.tool_call_id,
|
||||
result=result_str,
|
||||
)
|
||||
|
||||
# Generate LLM continuation with streaming
|
||||
await svc._generate_llm_continuation_with_streaming(
|
||||
session_id=task.session_id,
|
||||
user_id=task.user_id,
|
||||
task_id=task.task_id,
|
||||
)
|
||||
|
||||
# Mark task as completed and release Redis lock
|
||||
await stream_registry.mark_task_completed(task.task_id, status="completed")
|
||||
await svc._mark_operation_completed(task.tool_call_id)
|
||||
else:
|
||||
# Publish error to stream registry
|
||||
from .response_model import StreamError
|
||||
|
||||
error_msg = request.error or "Operation failed"
|
||||
await stream_registry.publish_chunk(
|
||||
task.task_id,
|
||||
StreamError(errorText=error_msg),
|
||||
)
|
||||
# Send finish event to end the stream
|
||||
await stream_registry.publish_chunk(task.task_id, StreamFinish())
|
||||
|
||||
# Update pending operation with error
|
||||
from . import service as svc
|
||||
from .tools.models import ErrorResponse
|
||||
|
||||
error_response = ErrorResponse(
|
||||
message=error_msg,
|
||||
error=request.error,
|
||||
)
|
||||
await svc._update_pending_operation(
|
||||
session_id=task.session_id,
|
||||
tool_call_id=task.tool_call_id,
|
||||
result=error_response.model_dump_json(),
|
||||
)
|
||||
|
||||
# Mark task as failed and release Redis lock
|
||||
await stream_registry.mark_task_completed(task.task_id, status="failed")
|
||||
await svc._mark_operation_completed(task.tool_call_id)
|
||||
|
||||
return {"status": "ok", "task_id": task.task_id}
|
||||
|
||||
|
||||
# ========== Health Check ==========
|
||||
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ from backend.util.exceptions import NotFoundError
|
||||
from backend.util.settings import Settings
|
||||
|
||||
from . import db as chat_db
|
||||
from . import stream_registry
|
||||
from .config import ChatConfig
|
||||
from .model import (
|
||||
ChatMessage,
|
||||
@@ -1610,8 +1611,9 @@ async def _yield_tool_call(
|
||||
)
|
||||
return
|
||||
|
||||
# Generate operation ID
|
||||
# Generate operation ID and task ID
|
||||
operation_id = str(uuid_module.uuid4())
|
||||
task_id = str(uuid_module.uuid4())
|
||||
|
||||
# Build a user-friendly message based on tool and arguments
|
||||
if tool_name == "create_agent":
|
||||
@@ -1654,6 +1656,16 @@ async def _yield_tool_call(
|
||||
|
||||
# Wrap session save and task creation in try-except to release lock on failure
|
||||
try:
|
||||
# Create task in stream registry for SSE reconnection support
|
||||
await stream_registry.create_task(
|
||||
task_id=task_id,
|
||||
session_id=session.session_id,
|
||||
user_id=session.user_id,
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=tool_name,
|
||||
operation_id=operation_id,
|
||||
)
|
||||
|
||||
# Save assistant message with tool_call FIRST (required by LLM)
|
||||
assistant_message = ChatMessage(
|
||||
role="assistant",
|
||||
@@ -1675,23 +1687,27 @@ async def _yield_tool_call(
|
||||
session.messages.append(pending_message)
|
||||
await upsert_chat_session(session)
|
||||
logger.info(
|
||||
f"Saved pending operation {operation_id} for tool {tool_name} "
|
||||
f"in session {session.session_id}"
|
||||
f"Saved pending operation {operation_id} (task_id={task_id}) "
|
||||
f"for tool {tool_name} in session {session.session_id}"
|
||||
)
|
||||
|
||||
# Store task reference in module-level set to prevent GC before completion
|
||||
task = asyncio.create_task(
|
||||
_execute_long_running_tool(
|
||||
bg_task = asyncio.create_task(
|
||||
_execute_long_running_tool_with_streaming(
|
||||
tool_name=tool_name,
|
||||
parameters=arguments,
|
||||
tool_call_id=tool_call_id,
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
session_id=session.session_id,
|
||||
user_id=session.user_id,
|
||||
)
|
||||
)
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
_background_tasks.add(bg_task)
|
||||
bg_task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
# Associate the asyncio task with the stream registry task
|
||||
await stream_registry.set_task_asyncio_task(task_id, bg_task)
|
||||
except Exception as e:
|
||||
# Roll back appended messages to prevent data corruption on subsequent saves
|
||||
if (
|
||||
@@ -1709,6 +1725,11 @@ async def _yield_tool_call(
|
||||
|
||||
# Release the Redis lock since the background task won't be spawned
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
# Mark stream registry task as failed if it was created
|
||||
try:
|
||||
await stream_registry.mark_task_completed(task_id, status="failed")
|
||||
except Exception:
|
||||
pass
|
||||
logger.error(
|
||||
f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True
|
||||
)
|
||||
@@ -1722,6 +1743,7 @@ async def _yield_tool_call(
|
||||
message=started_msg,
|
||||
operation_id=operation_id,
|
||||
tool_name=tool_name,
|
||||
task_id=task_id, # Include task_id for SSE reconnection
|
||||
).model_dump_json(),
|
||||
success=True,
|
||||
)
|
||||
@@ -1791,6 +1813,9 @@ async def _execute_long_running_tool(
|
||||
|
||||
This function runs independently of the SSE connection, so the operation
|
||||
survives if the user closes their browser tab.
|
||||
|
||||
NOTE: This is the legacy function without stream registry support.
|
||||
Use _execute_long_running_tool_with_streaming for new implementations.
|
||||
"""
|
||||
try:
|
||||
# Load fresh session (not stale reference)
|
||||
@@ -1834,15 +1859,132 @@ async def _execute_long_running_tool(
|
||||
tool_call_id=tool_call_id,
|
||||
result=error_response.model_dump_json(),
|
||||
)
|
||||
# Generate LLM continuation so user sees explanation even for errors
|
||||
try:
|
||||
await _generate_llm_continuation(session_id=session_id, user_id=user_id)
|
||||
except Exception as llm_err:
|
||||
logger.warning(f"Failed to generate LLM continuation for error: {llm_err}")
|
||||
finally:
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
|
||||
|
||||
async def _execute_long_running_tool_with_streaming(
|
||||
tool_name: str,
|
||||
parameters: dict[str, Any],
|
||||
tool_call_id: str,
|
||||
operation_id: str,
|
||||
task_id: str,
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
) -> None:
|
||||
"""Execute a long-running tool with stream registry support for SSE reconnection.
|
||||
|
||||
This function runs independently of the SSE connection, publishes progress
|
||||
to the stream registry, and survives if the user closes their browser tab.
|
||||
Clients can reconnect via GET /chat/tasks/{task_id}/stream to resume streaming.
|
||||
|
||||
If the external service returns a 202 Accepted (async), this function exits
|
||||
early and lets the RabbitMQ completion consumer handle the rest.
|
||||
"""
|
||||
# Track whether we delegated to async processing - if so, the RabbitMQ
|
||||
# completion consumer will handle cleanup, not us
|
||||
delegated_to_async = False
|
||||
|
||||
try:
|
||||
# Load fresh session (not stale reference)
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
if not session:
|
||||
logger.error(f"Session {session_id} not found for background tool")
|
||||
await stream_registry.mark_task_completed(task_id, status="failed")
|
||||
return
|
||||
|
||||
# Pass operation_id and task_id to the tool for async processing
|
||||
enriched_parameters = {
|
||||
**parameters,
|
||||
"_operation_id": operation_id,
|
||||
"_task_id": task_id,
|
||||
}
|
||||
|
||||
# Execute the actual tool
|
||||
result = await execute_tool(
|
||||
tool_name=tool_name,
|
||||
parameters=enriched_parameters,
|
||||
tool_call_id=tool_call_id,
|
||||
user_id=user_id,
|
||||
session=session,
|
||||
)
|
||||
|
||||
# Check if the tool result indicates async processing
|
||||
# (e.g., Agent Generator returned 202 Accepted)
|
||||
try:
|
||||
result_data = orjson.loads(result.output) if result.output else {}
|
||||
if result_data.get("status") == "accepted":
|
||||
logger.info(
|
||||
f"Tool {tool_name} delegated to async processing "
|
||||
f"(operation_id={operation_id}, task_id={task_id}). "
|
||||
f"RabbitMQ completion consumer will handle the rest."
|
||||
)
|
||||
# Don't publish result, don't continue with LLM, and don't cleanup
|
||||
# The RabbitMQ consumer will handle everything when the external
|
||||
# service completes and publishes to the queue
|
||||
delegated_to_async = True
|
||||
return
|
||||
except (orjson.JSONDecodeError, TypeError):
|
||||
pass # Not JSON or not async - continue normally
|
||||
|
||||
# Publish tool result to stream registry
|
||||
await stream_registry.publish_chunk(task_id, result)
|
||||
|
||||
# Update the pending message with result
|
||||
result_str = (
|
||||
result.output
|
||||
if isinstance(result.output, str)
|
||||
else orjson.dumps(result.output).decode("utf-8")
|
||||
)
|
||||
await _update_pending_operation(
|
||||
session_id=session_id,
|
||||
tool_call_id=tool_call_id,
|
||||
result=result_str,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Background tool {tool_name} completed for session {session_id} "
|
||||
f"(task_id={task_id})"
|
||||
)
|
||||
|
||||
# Generate LLM continuation and stream chunks to registry
|
||||
await _generate_llm_continuation_with_streaming(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
task_id=task_id,
|
||||
)
|
||||
|
||||
# Mark task as completed in stream registry
|
||||
await stream_registry.mark_task_completed(task_id, status="completed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
|
||||
error_response = ErrorResponse(
|
||||
message=f"Tool {tool_name} failed: {str(e)}",
|
||||
)
|
||||
|
||||
# Publish error to stream registry followed by finish event
|
||||
await stream_registry.publish_chunk(
|
||||
task_id,
|
||||
StreamError(errorText=str(e)),
|
||||
)
|
||||
await stream_registry.publish_chunk(task_id, StreamFinish())
|
||||
|
||||
await _update_pending_operation(
|
||||
session_id=session_id,
|
||||
tool_call_id=tool_call_id,
|
||||
result=error_response.model_dump_json(),
|
||||
)
|
||||
|
||||
# Mark task as failed in stream registry
|
||||
await stream_registry.mark_task_completed(task_id, status="failed")
|
||||
finally:
|
||||
# Only cleanup if we didn't delegate to async processing
|
||||
# For async path, the RabbitMQ completion consumer handles cleanup
|
||||
if not delegated_to_async:
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
|
||||
|
||||
async def _update_pending_operation(
|
||||
session_id: str,
|
||||
tool_call_id: str,
|
||||
@@ -1969,3 +2111,128 @@ async def _generate_llm_continuation(
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True)
|
||||
|
||||
|
||||
async def _generate_llm_continuation_with_streaming(
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
task_id: str,
|
||||
) -> None:
|
||||
"""Generate an LLM response with streaming to the stream registry.
|
||||
|
||||
This is called by background tasks to continue the conversation
|
||||
after a tool result is saved. Chunks are published to the stream registry
|
||||
so reconnecting clients can receive them.
|
||||
"""
|
||||
import uuid as uuid_module
|
||||
|
||||
try:
|
||||
# Load fresh session from DB (bypass cache to get the updated tool result)
|
||||
await invalidate_session_cache(session_id)
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
if not session:
|
||||
logger.error(f"Session {session_id} not found for LLM continuation")
|
||||
return
|
||||
|
||||
# Build system prompt
|
||||
system_prompt, _ = await _build_system_prompt(user_id)
|
||||
|
||||
# Build messages in OpenAI format
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
from openai.types.chat import ChatCompletionSystemMessageParam
|
||||
|
||||
system_message = ChatCompletionSystemMessageParam(
|
||||
role="system",
|
||||
content=system_prompt,
|
||||
)
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Build extra_body for tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {
|
||||
"environment": settings.config.app_env.value,
|
||||
},
|
||||
}
|
||||
if user_id:
|
||||
extra_body["user"] = user_id[:128]
|
||||
extra_body["posthogDistinctId"] = user_id
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
|
||||
# Make streaming LLM call (no tools - just text response)
|
||||
from typing import cast
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
# Generate unique IDs for AI SDK protocol
|
||||
message_id = str(uuid_module.uuid4())
|
||||
text_block_id = str(uuid_module.uuid4())
|
||||
|
||||
# Publish start event
|
||||
await stream_registry.publish_chunk(task_id, StreamStart(messageId=message_id))
|
||||
await stream_registry.publish_chunk(task_id, StreamTextStart(id=text_block_id))
|
||||
|
||||
# Stream the response
|
||||
stream = await client.chat.completions.create(
|
||||
model=config.model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
extra_body=extra_body,
|
||||
stream=True,
|
||||
)
|
||||
|
||||
assistant_content = ""
|
||||
async for chunk in stream:
|
||||
if chunk.choices and chunk.choices[0].delta.content:
|
||||
delta = chunk.choices[0].delta.content
|
||||
assistant_content += delta
|
||||
# Publish delta to stream registry
|
||||
await stream_registry.publish_chunk(
|
||||
task_id,
|
||||
StreamTextDelta(id=text_block_id, delta=delta),
|
||||
)
|
||||
|
||||
# Publish end events
|
||||
await stream_registry.publish_chunk(task_id, StreamTextEnd(id=text_block_id))
|
||||
|
||||
if assistant_content:
|
||||
# Reload session from DB to avoid race condition with user messages
|
||||
fresh_session = await get_chat_session(session_id, user_id)
|
||||
if not fresh_session:
|
||||
logger.error(
|
||||
f"Session {session_id} disappeared during LLM continuation"
|
||||
)
|
||||
return
|
||||
|
||||
# Save assistant message to database
|
||||
assistant_message = ChatMessage(
|
||||
role="assistant",
|
||||
content=assistant_content,
|
||||
)
|
||||
fresh_session.messages.append(assistant_message)
|
||||
|
||||
# Save to database (not cache) to persist the response
|
||||
await upsert_chat_session(fresh_session)
|
||||
|
||||
# Invalidate cache so next poll/refresh gets fresh data
|
||||
await invalidate_session_cache(session_id)
|
||||
|
||||
logger.info(
|
||||
f"Generated streaming LLM continuation for session {session_id} "
|
||||
f"(task_id={task_id}), response length: {len(assistant_content)}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Streaming LLM continuation returned empty response for {session_id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to generate streaming LLM continuation: {e}", exc_info=True
|
||||
)
|
||||
# Publish error to stream registry followed by finish event
|
||||
await stream_registry.publish_chunk(
|
||||
task_id,
|
||||
StreamError(errorText=f"Failed to generate response: {e}"),
|
||||
)
|
||||
await stream_registry.publish_chunk(task_id, StreamFinish())
|
||||
|
||||
@@ -0,0 +1,648 @@
|
||||
"""Stream registry for managing reconnectable SSE streams.
|
||||
|
||||
This module provides a registry for tracking active streaming tasks and their
|
||||
messages. It supports:
|
||||
- Creating tasks with unique IDs for long-running operations
|
||||
- Publishing stream messages to both Redis Streams and in-memory queues
|
||||
- Subscribing to tasks with replay of missed messages
|
||||
- Looking up tasks by operation_id for webhook callbacks
|
||||
- Cross-pod real-time delivery via Redis pub/sub
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Literal
|
||||
|
||||
import orjson
|
||||
|
||||
from backend.data.redis_client import get_redis_async
|
||||
|
||||
from .config import ChatConfig
|
||||
from .response_model import StreamBaseResponse, StreamFinish
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = ChatConfig()
|
||||
|
||||
# Track active pub/sub listeners for cross-pod delivery
|
||||
_pubsub_listeners: dict[str, asyncio.Task] = {}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActiveTask:
|
||||
"""Represents an active streaming task."""
|
||||
|
||||
task_id: str
|
||||
session_id: str
|
||||
user_id: str | None
|
||||
tool_call_id: str
|
||||
tool_name: str
|
||||
operation_id: str
|
||||
status: Literal["running", "completed", "failed"] = "running"
|
||||
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
queue: asyncio.Queue[StreamBaseResponse] = field(default_factory=asyncio.Queue)
|
||||
asyncio_task: asyncio.Task | None = None
|
||||
# Lock for atomic status checks and subscriber management
|
||||
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
||||
# Set of subscriber queues for fan-out
|
||||
subscribers: set[asyncio.Queue[StreamBaseResponse]] = field(default_factory=set)
|
||||
|
||||
|
||||
# Module-level registry for active tasks
|
||||
_active_tasks: dict[str, ActiveTask] = {}
|
||||
|
||||
# Redis key patterns
|
||||
TASK_META_PREFIX = "chat:task:meta:" # Hash for task metadata
|
||||
TASK_STREAM_PREFIX = "chat:stream:" # Redis Stream for messages
|
||||
TASK_OP_PREFIX = "chat:task:op:" # Operation ID -> task_id mapping
|
||||
TASK_PUBSUB_PREFIX = "chat:task:pubsub:" # Pub/sub channel for cross-pod delivery
|
||||
|
||||
|
||||
def _get_task_meta_key(task_id: str) -> str:
|
||||
"""Get Redis key for task metadata."""
|
||||
return f"{TASK_META_PREFIX}{task_id}"
|
||||
|
||||
|
||||
def _get_task_stream_key(task_id: str) -> str:
|
||||
"""Get Redis key for task message stream."""
|
||||
return f"{TASK_STREAM_PREFIX}{task_id}"
|
||||
|
||||
|
||||
def _get_operation_mapping_key(operation_id: str) -> str:
|
||||
"""Get Redis key for operation_id to task_id mapping."""
|
||||
return f"{TASK_OP_PREFIX}{operation_id}"
|
||||
|
||||
|
||||
def _get_task_pubsub_channel(task_id: str) -> str:
|
||||
"""Get Redis pub/sub channel for task cross-pod delivery."""
|
||||
return f"{TASK_PUBSUB_PREFIX}{task_id}"
|
||||
|
||||
|
||||
async def create_task(
|
||||
task_id: str,
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
tool_call_id: str,
|
||||
tool_name: str,
|
||||
operation_id: str,
|
||||
) -> ActiveTask:
|
||||
"""Create a new streaming task in memory and Redis.
|
||||
|
||||
Args:
|
||||
task_id: Unique identifier for the task
|
||||
session_id: Chat session ID
|
||||
user_id: User ID (may be None for anonymous)
|
||||
tool_call_id: Tool call ID from the LLM
|
||||
tool_name: Name of the tool being executed
|
||||
operation_id: Operation ID for webhook callbacks
|
||||
|
||||
Returns:
|
||||
The created ActiveTask instance
|
||||
"""
|
||||
task = ActiveTask(
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=tool_name,
|
||||
operation_id=operation_id,
|
||||
)
|
||||
|
||||
# Store in memory registry
|
||||
_active_tasks[task_id] = task
|
||||
|
||||
# Store metadata in Redis for durability
|
||||
redis = await get_redis_async()
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
op_key = _get_operation_mapping_key(operation_id)
|
||||
|
||||
await redis.hset( # type: ignore[misc]
|
||||
meta_key,
|
||||
mapping={
|
||||
"task_id": task_id,
|
||||
"session_id": session_id,
|
||||
"user_id": user_id or "",
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_name": tool_name,
|
||||
"operation_id": operation_id,
|
||||
"status": task.status,
|
||||
"created_at": task.created_at.isoformat(),
|
||||
},
|
||||
)
|
||||
await redis.expire(meta_key, config.stream_ttl)
|
||||
|
||||
# Create operation_id -> task_id mapping for webhook lookups
|
||||
await redis.set(op_key, task_id, ex=config.stream_ttl)
|
||||
|
||||
logger.info(
|
||||
f"Created streaming task {task_id} for operation {operation_id} "
|
||||
f"in session {session_id}"
|
||||
)
|
||||
|
||||
return task
|
||||
|
||||
|
||||
async def publish_chunk(
|
||||
task_id: str,
|
||||
chunk: StreamBaseResponse,
|
||||
) -> str:
|
||||
"""Publish a chunk to the task's stream.
|
||||
|
||||
Delivers to in-memory subscribers first (for real-time), then persists to
|
||||
Redis Stream (for replay). This order ensures live subscribers get messages
|
||||
even if Redis temporarily fails.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to publish to
|
||||
chunk: The stream response chunk to publish
|
||||
|
||||
Returns:
|
||||
The Redis Stream message ID (format: "timestamp-sequence"), or "0-0" if
|
||||
Redis persistence failed
|
||||
"""
|
||||
# Deliver to in-memory subscribers FIRST for real-time updates
|
||||
task = _active_tasks.get(task_id)
|
||||
if task:
|
||||
async with task.lock:
|
||||
for subscriber_queue in task.subscribers:
|
||||
try:
|
||||
subscriber_queue.put_nowait(chunk)
|
||||
except asyncio.QueueFull:
|
||||
logger.warning(
|
||||
f"Subscriber queue full for task {task_id}, dropping chunk"
|
||||
)
|
||||
|
||||
# Then persist to Redis Stream for replay (with error handling)
|
||||
message_id = "0-0"
|
||||
chunk_json = chunk.model_dump_json()
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Add to Redis Stream with auto-generated ID
|
||||
# The ID format is "timestamp-sequence" which gives us ordering
|
||||
raw_id = await redis.xadd(
|
||||
stream_key,
|
||||
{"data": chunk_json},
|
||||
maxlen=config.stream_max_length,
|
||||
)
|
||||
message_id = raw_id if isinstance(raw_id, str) else raw_id.decode()
|
||||
|
||||
# Publish to pub/sub for cross-pod real-time delivery
|
||||
pubsub_channel = _get_task_pubsub_channel(task_id)
|
||||
await redis.publish(pubsub_channel, chunk_json)
|
||||
|
||||
logger.debug(f"Published chunk to task {task_id}, message_id={message_id}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to persist chunk to Redis for task {task_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return message_id
|
||||
|
||||
|
||||
async def subscribe_to_task(
|
||||
task_id: str,
|
||||
user_id: str | None,
|
||||
last_message_id: str = "0-0",
|
||||
) -> asyncio.Queue[StreamBaseResponse] | None:
|
||||
"""Subscribe to a task's stream with replay of missed messages.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to subscribe to
|
||||
user_id: User ID for ownership validation
|
||||
last_message_id: Last Redis Stream message ID received ("0-0" for full replay)
|
||||
|
||||
Returns:
|
||||
An asyncio Queue that will receive stream chunks, or None if task not found
|
||||
or user doesn't have access
|
||||
"""
|
||||
# Check in-memory first
|
||||
task = _active_tasks.get(task_id)
|
||||
|
||||
if task:
|
||||
# Validate ownership
|
||||
if user_id and task.user_id and task.user_id != user_id:
|
||||
logger.warning(
|
||||
f"User {user_id} attempted to subscribe to task {task_id} "
|
||||
f"owned by {task.user_id}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Create a new queue for this subscriber
|
||||
subscriber_queue: asyncio.Queue[StreamBaseResponse] = asyncio.Queue()
|
||||
|
||||
# Replay from Redis Stream
|
||||
redis = await get_redis_async()
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Track the last message ID we've seen for gap detection
|
||||
replay_last_id = last_message_id
|
||||
|
||||
# Read all messages from stream starting after last_message_id
|
||||
# xread returns messages with ID > last_message_id
|
||||
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
|
||||
|
||||
if messages:
|
||||
# messages format: [[stream_name, [(id, {data: json}), ...]]]
|
||||
for _stream_name, stream_messages in messages:
|
||||
for msg_id, msg_data in stream_messages:
|
||||
# Track the last message ID we've processed
|
||||
replay_last_id = (
|
||||
msg_id if isinstance(msg_id, str) else msg_id.decode()
|
||||
)
|
||||
if b"data" in msg_data:
|
||||
try:
|
||||
chunk_data = orjson.loads(msg_data[b"data"])
|
||||
# Reconstruct the appropriate response type
|
||||
chunk = _reconstruct_chunk(chunk_data)
|
||||
if chunk:
|
||||
await subscriber_queue.put(chunk)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to replay message: {e}")
|
||||
|
||||
# Atomically check status and register subscriber under lock
|
||||
# This prevents race condition where task completes between check and subscribe
|
||||
should_start_pubsub = False
|
||||
async with task.lock:
|
||||
if task.status == "running":
|
||||
# Register this subscriber for live updates
|
||||
task.subscribers.add(subscriber_queue)
|
||||
# Start pub/sub listener if this is the first subscriber
|
||||
should_start_pubsub = len(task.subscribers) == 1
|
||||
logger.debug(
|
||||
f"Registered subscriber for task {task_id}, "
|
||||
f"total subscribers: {len(task.subscribers)}"
|
||||
)
|
||||
else:
|
||||
# Task is done, add finish marker
|
||||
await subscriber_queue.put(StreamFinish())
|
||||
|
||||
# After registering, do a second read to catch any messages published
|
||||
# between the first read and registration (closes the race window)
|
||||
if task.status == "running":
|
||||
gap_messages = await redis.xread(
|
||||
{stream_key: replay_last_id}, block=0, count=1000
|
||||
)
|
||||
if gap_messages:
|
||||
for _stream_name, stream_messages in gap_messages:
|
||||
for _msg_id, msg_data in stream_messages:
|
||||
if b"data" in msg_data:
|
||||
try:
|
||||
chunk_data = orjson.loads(msg_data[b"data"])
|
||||
chunk = _reconstruct_chunk(chunk_data)
|
||||
if chunk:
|
||||
await subscriber_queue.put(chunk)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to replay gap message: {e}")
|
||||
|
||||
# Start pub/sub listener outside the lock to avoid deadlocks
|
||||
if should_start_pubsub:
|
||||
await start_pubsub_listener(task_id)
|
||||
|
||||
return subscriber_queue
|
||||
|
||||
# Try to load from Redis if not in memory
|
||||
redis = await get_redis_async()
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
|
||||
|
||||
if not meta:
|
||||
logger.warning(f"Task {task_id} not found in memory or Redis")
|
||||
return None
|
||||
|
||||
# Validate ownership
|
||||
task_user_id = meta.get(b"user_id", b"").decode() or None
|
||||
if user_id and task_user_id and task_user_id != user_id:
|
||||
logger.warning(
|
||||
f"User {user_id} attempted to subscribe to task {task_id} "
|
||||
f"owned by {task_user_id}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Replay from Redis Stream only (task is not in memory, so it's completed/crashed)
|
||||
subscriber_queue = asyncio.Queue()
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Read all messages starting after last_message_id
|
||||
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
|
||||
|
||||
if messages:
|
||||
for _stream_name, stream_messages in messages:
|
||||
for _msg_id, msg_data in stream_messages:
|
||||
if b"data" in msg_data:
|
||||
try:
|
||||
chunk_data = orjson.loads(msg_data[b"data"])
|
||||
chunk = _reconstruct_chunk(chunk_data)
|
||||
if chunk:
|
||||
await subscriber_queue.put(chunk)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to replay message: {e}")
|
||||
|
||||
# Add finish marker since task is not active
|
||||
await subscriber_queue.put(StreamFinish())
|
||||
|
||||
return subscriber_queue
|
||||
|
||||
|
||||
async def mark_task_completed(
|
||||
task_id: str,
|
||||
status: Literal["completed", "failed"] = "completed",
|
||||
) -> None:
|
||||
"""Mark a task as completed and publish final event.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to mark as completed
|
||||
status: Final status ("completed" or "failed")
|
||||
"""
|
||||
task = _active_tasks.get(task_id)
|
||||
|
||||
if task:
|
||||
# Acquire lock to prevent new subscribers during completion
|
||||
async with task.lock:
|
||||
task.status = status
|
||||
# Send finish event directly to all current subscribers
|
||||
finish_event = StreamFinish()
|
||||
for subscriber_queue in task.subscribers:
|
||||
try:
|
||||
subscriber_queue.put_nowait(finish_event)
|
||||
except asyncio.QueueFull:
|
||||
logger.warning(
|
||||
f"Subscriber queue full for task {task_id} during completion"
|
||||
)
|
||||
# Clear subscribers since task is done
|
||||
task.subscribers.clear()
|
||||
|
||||
# Stop pub/sub listener since task is done
|
||||
await stop_pubsub_listener(task_id)
|
||||
|
||||
# Also publish to Redis Stream for replay (and pub/sub for cross-pod)
|
||||
await publish_chunk(task_id, StreamFinish())
|
||||
|
||||
# Remove from active tasks after a short delay to allow subscribers to finish
|
||||
async def _cleanup():
|
||||
await asyncio.sleep(5)
|
||||
_active_tasks.pop(task_id, None)
|
||||
logger.info(f"Cleaned up task {task_id} from memory")
|
||||
|
||||
asyncio.create_task(_cleanup())
|
||||
|
||||
# Update Redis metadata
|
||||
redis = await get_redis_async()
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
await redis.hset(meta_key, "status", status) # type: ignore[misc]
|
||||
|
||||
logger.info(f"Marked task {task_id} as {status}")
|
||||
|
||||
|
||||
async def find_task_by_operation_id(operation_id: str) -> ActiveTask | None:
|
||||
"""Find a task by its operation ID.
|
||||
|
||||
Used by webhook callbacks to locate the task to update.
|
||||
|
||||
Args:
|
||||
operation_id: Operation ID to search for
|
||||
|
||||
Returns:
|
||||
ActiveTask if found, None otherwise
|
||||
"""
|
||||
# Check in-memory first
|
||||
for task in _active_tasks.values():
|
||||
if task.operation_id == operation_id:
|
||||
return task
|
||||
|
||||
# Try Redis lookup
|
||||
redis = await get_redis_async()
|
||||
op_key = _get_operation_mapping_key(operation_id)
|
||||
task_id = await redis.get(op_key)
|
||||
|
||||
if task_id:
|
||||
task_id_str = task_id.decode() if isinstance(task_id, bytes) else task_id
|
||||
# Check if task is in memory
|
||||
if task_id_str in _active_tasks:
|
||||
return _active_tasks[task_id_str]
|
||||
|
||||
# Load metadata from Redis
|
||||
meta_key = _get_task_meta_key(task_id_str)
|
||||
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
|
||||
|
||||
if meta:
|
||||
# Reconstruct task object (not fully active, but has metadata)
|
||||
return ActiveTask(
|
||||
task_id=meta.get(b"task_id", b"").decode(),
|
||||
session_id=meta.get(b"session_id", b"").decode(),
|
||||
user_id=meta.get(b"user_id", b"").decode() or None,
|
||||
tool_call_id=meta.get(b"tool_call_id", b"").decode(),
|
||||
tool_name=meta.get(b"tool_name", b"").decode(),
|
||||
operation_id=operation_id,
|
||||
status=meta.get(b"status", b"running").decode(), # type: ignore
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def get_task(task_id: str) -> ActiveTask | None:
|
||||
"""Get a task by its ID.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to look up
|
||||
|
||||
Returns:
|
||||
ActiveTask if found, None otherwise
|
||||
"""
|
||||
# Check in-memory first
|
||||
if task_id in _active_tasks:
|
||||
return _active_tasks[task_id]
|
||||
|
||||
# Try Redis lookup
|
||||
redis = await get_redis_async()
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
|
||||
|
||||
if meta:
|
||||
return ActiveTask(
|
||||
task_id=meta.get(b"task_id", b"").decode(),
|
||||
session_id=meta.get(b"session_id", b"").decode(),
|
||||
user_id=meta.get(b"user_id", b"").decode() or None,
|
||||
tool_call_id=meta.get(b"tool_call_id", b"").decode(),
|
||||
tool_name=meta.get(b"tool_name", b"").decode(),
|
||||
operation_id=meta.get(b"operation_id", b"").decode(),
|
||||
status=meta.get(b"status", b"running").decode(), # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _reconstruct_chunk(chunk_data: dict) -> StreamBaseResponse | None:
|
||||
"""Reconstruct a StreamBaseResponse from JSON data.
|
||||
|
||||
Args:
|
||||
chunk_data: Parsed JSON data from Redis
|
||||
|
||||
Returns:
|
||||
Reconstructed response object, or None if unknown type
|
||||
"""
|
||||
from .response_model import (
|
||||
ResponseType,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamHeartbeat,
|
||||
StreamStart,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
StreamUsage,
|
||||
)
|
||||
|
||||
chunk_type = chunk_data.get("type")
|
||||
|
||||
try:
|
||||
if chunk_type == ResponseType.START.value:
|
||||
return StreamStart(**chunk_data)
|
||||
elif chunk_type == ResponseType.FINISH.value:
|
||||
return StreamFinish(**chunk_data)
|
||||
elif chunk_type == ResponseType.TEXT_START.value:
|
||||
return StreamTextStart(**chunk_data)
|
||||
elif chunk_type == ResponseType.TEXT_DELTA.value:
|
||||
return StreamTextDelta(**chunk_data)
|
||||
elif chunk_type == ResponseType.TEXT_END.value:
|
||||
return StreamTextEnd(**chunk_data)
|
||||
elif chunk_type == ResponseType.TOOL_INPUT_START.value:
|
||||
return StreamToolInputStart(**chunk_data)
|
||||
elif chunk_type == ResponseType.TOOL_INPUT_AVAILABLE.value:
|
||||
return StreamToolInputAvailable(**chunk_data)
|
||||
elif chunk_type == ResponseType.TOOL_OUTPUT_AVAILABLE.value:
|
||||
return StreamToolOutputAvailable(**chunk_data)
|
||||
elif chunk_type == ResponseType.ERROR.value:
|
||||
return StreamError(**chunk_data)
|
||||
elif chunk_type == ResponseType.USAGE.value:
|
||||
return StreamUsage(**chunk_data)
|
||||
elif chunk_type == ResponseType.HEARTBEAT.value:
|
||||
return StreamHeartbeat(**chunk_data)
|
||||
else:
|
||||
logger.warning(f"Unknown chunk type: {chunk_type}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to reconstruct chunk of type {chunk_type}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def set_task_asyncio_task(task_id: str, asyncio_task: asyncio.Task) -> None:
|
||||
"""Associate an asyncio.Task with an ActiveTask.
|
||||
|
||||
Args:
|
||||
task_id: Task ID
|
||||
asyncio_task: The asyncio Task to associate
|
||||
"""
|
||||
task = _active_tasks.get(task_id)
|
||||
if task:
|
||||
task.asyncio_task = asyncio_task
|
||||
|
||||
|
||||
async def unsubscribe_from_task(
|
||||
task_id: str,
|
||||
subscriber_queue: asyncio.Queue[StreamBaseResponse],
|
||||
) -> None:
|
||||
"""Unsubscribe a queue from a task's stream.
|
||||
|
||||
Should be called when a client disconnects to clean up resources.
|
||||
Also stops the pub/sub listener if there are no more local subscribers.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to unsubscribe from
|
||||
subscriber_queue: The queue to remove from subscribers
|
||||
"""
|
||||
task = _active_tasks.get(task_id)
|
||||
if task:
|
||||
async with task.lock:
|
||||
task.subscribers.discard(subscriber_queue)
|
||||
remaining = len(task.subscribers)
|
||||
logger.debug(
|
||||
f"Unsubscribed from task {task_id}, "
|
||||
f"remaining subscribers: {remaining}"
|
||||
)
|
||||
# Stop pub/sub listener if no more local subscribers
|
||||
if remaining == 0:
|
||||
await stop_pubsub_listener(task_id)
|
||||
|
||||
|
||||
async def start_pubsub_listener(task_id: str) -> None:
|
||||
"""Start listening to Redis pub/sub for cross-pod delivery.
|
||||
|
||||
This enables real-time updates when another pod publishes chunks for a task
|
||||
that has local subscribers on this pod.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to listen for
|
||||
"""
|
||||
if task_id in _pubsub_listeners:
|
||||
return # Already listening
|
||||
|
||||
task = _active_tasks.get(task_id)
|
||||
if not task:
|
||||
return
|
||||
|
||||
async def _listener():
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
pubsub = redis.pubsub()
|
||||
channel = _get_task_pubsub_channel(task_id)
|
||||
await pubsub.subscribe(channel)
|
||||
logger.debug(f"Started pub/sub listener for task {task_id}")
|
||||
|
||||
async for message in pubsub.listen():
|
||||
if message["type"] != "message":
|
||||
continue
|
||||
|
||||
try:
|
||||
chunk_data = orjson.loads(message["data"])
|
||||
chunk = _reconstruct_chunk(chunk_data)
|
||||
if chunk:
|
||||
# Deliver to local subscribers
|
||||
local_task = _active_tasks.get(task_id)
|
||||
if local_task:
|
||||
async with local_task.lock:
|
||||
for queue in local_task.subscribers:
|
||||
try:
|
||||
queue.put_nowait(chunk)
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
# Stop listening if this was a finish event
|
||||
if isinstance(chunk, StreamFinish):
|
||||
break
|
||||
except Exception as e:
|
||||
logger.warning(f"Error processing pub/sub message: {e}")
|
||||
|
||||
await pubsub.unsubscribe(channel)
|
||||
await pubsub.close()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Pub/sub listener error for task {task_id}: {e}")
|
||||
finally:
|
||||
_pubsub_listeners.pop(task_id, None)
|
||||
logger.debug(f"Stopped pub/sub listener for task {task_id}")
|
||||
|
||||
listener_task = asyncio.create_task(_listener())
|
||||
_pubsub_listeners[task_id] = listener_task
|
||||
|
||||
|
||||
async def stop_pubsub_listener(task_id: str) -> None:
|
||||
"""Stop the pub/sub listener for a task.
|
||||
|
||||
Args:
|
||||
task_id: Task ID to stop listening for
|
||||
"""
|
||||
listener = _pubsub_listeners.pop(task_id, None)
|
||||
if listener and not listener.done():
|
||||
listener.cancel()
|
||||
try:
|
||||
await listener
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.debug(f"Cancelled pub/sub listener for task {task_id}")
|
||||
@@ -2,52 +2,27 @@
|
||||
|
||||
from .core import (
|
||||
AgentGeneratorNotConfiguredError,
|
||||
AgentSummary,
|
||||
DecompositionResult,
|
||||
DecompositionStep,
|
||||
LibraryAgentSummary,
|
||||
MarketplaceAgentSummary,
|
||||
decompose_goal,
|
||||
enrich_library_agents_from_steps,
|
||||
extract_search_terms_from_steps,
|
||||
extract_uuids_from_text,
|
||||
generate_agent,
|
||||
generate_agent_patch,
|
||||
get_agent_as_json,
|
||||
get_all_relevant_agents_for_generation,
|
||||
get_library_agent_by_graph_id,
|
||||
get_library_agent_by_id,
|
||||
get_library_agents_for_generation,
|
||||
json_to_graph,
|
||||
save_agent_to_library,
|
||||
search_marketplace_agents_for_generation,
|
||||
)
|
||||
from .errors import get_user_message_for_error
|
||||
from .service import health_check as check_external_service_health
|
||||
from .service import is_external_service_configured
|
||||
|
||||
__all__ = [
|
||||
"AgentGeneratorNotConfiguredError",
|
||||
"AgentSummary",
|
||||
"DecompositionResult",
|
||||
"DecompositionStep",
|
||||
"LibraryAgentSummary",
|
||||
"MarketplaceAgentSummary",
|
||||
"check_external_service_health",
|
||||
# Core functions
|
||||
"decompose_goal",
|
||||
"enrich_library_agents_from_steps",
|
||||
"extract_search_terms_from_steps",
|
||||
"extract_uuids_from_text",
|
||||
"generate_agent",
|
||||
"generate_agent_patch",
|
||||
"get_agent_as_json",
|
||||
"get_all_relevant_agents_for_generation",
|
||||
"get_library_agent_by_graph_id",
|
||||
"get_library_agent_by_id",
|
||||
"get_library_agents_for_generation",
|
||||
"get_user_message_for_error",
|
||||
"is_external_service_configured",
|
||||
"json_to_graph",
|
||||
"save_agent_to_library",
|
||||
"search_marketplace_agents_for_generation",
|
||||
"get_agent_as_json",
|
||||
"json_to_graph",
|
||||
# Exceptions
|
||||
"AgentGeneratorNotConfiguredError",
|
||||
# Service
|
||||
"is_external_service_configured",
|
||||
"check_external_service_health",
|
||||
]
|
||||
|
||||
@@ -1,21 +1,11 @@
|
||||
"""Core agent generation functions."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from typing import Any, TypedDict
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.library import db as library_db
|
||||
from backend.api.features.store import db as store_db
|
||||
from backend.data.graph import (
|
||||
Graph,
|
||||
Link,
|
||||
Node,
|
||||
create_graph,
|
||||
get_graph,
|
||||
get_graph_all_versions,
|
||||
)
|
||||
from backend.util.exceptions import NotFoundError
|
||||
from backend.data.graph import Graph, Link, Node, create_graph
|
||||
|
||||
from .service import (
|
||||
decompose_goal_external,
|
||||
@@ -27,60 +17,6 @@ from .service import (
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LibraryAgentSummary(TypedDict):
|
||||
"""Summary of a library agent for sub-agent composition."""
|
||||
|
||||
graph_id: str
|
||||
graph_version: int
|
||||
name: str
|
||||
description: str
|
||||
input_schema: dict[str, Any]
|
||||
output_schema: dict[str, Any]
|
||||
|
||||
|
||||
class MarketplaceAgentSummary(TypedDict):
|
||||
"""Summary of a marketplace agent for sub-agent composition."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
sub_heading: str
|
||||
creator: str
|
||||
is_marketplace_agent: bool
|
||||
|
||||
|
||||
class DecompositionStep(TypedDict, total=False):
|
||||
"""A single step in decomposed instructions."""
|
||||
|
||||
description: str
|
||||
action: str
|
||||
block_name: str
|
||||
tool: str
|
||||
name: str
|
||||
|
||||
|
||||
class DecompositionResult(TypedDict, total=False):
|
||||
"""Result from decompose_goal - can be instructions, questions, or error."""
|
||||
|
||||
type: str # "instructions", "clarifying_questions", "error", etc.
|
||||
steps: list[DecompositionStep]
|
||||
questions: list[dict[str, Any]]
|
||||
error: str
|
||||
error_type: str
|
||||
|
||||
|
||||
# Type alias for agent summaries (can be either library or marketplace)
|
||||
AgentSummary = LibraryAgentSummary | MarketplaceAgentSummary | dict[str, Any]
|
||||
|
||||
|
||||
def _to_dict_list(
|
||||
agents: list[AgentSummary] | list[dict[str, Any]] | None,
|
||||
) -> list[dict[str, Any]] | None:
|
||||
"""Convert typed agent summaries to plain dicts for external service calls."""
|
||||
if agents is None:
|
||||
return None
|
||||
return [dict(a) for a in agents]
|
||||
|
||||
|
||||
class AgentGeneratorNotConfiguredError(Exception):
|
||||
"""Raised when the external Agent Generator service is not configured."""
|
||||
|
||||
@@ -100,382 +36,15 @@ def _check_service_configured() -> None:
|
||||
)
|
||||
|
||||
|
||||
_UUID_PATTERN = re.compile(
|
||||
r"[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def extract_uuids_from_text(text: str) -> list[str]:
|
||||
"""Extract all UUID v4 strings from text.
|
||||
|
||||
Args:
|
||||
text: Text that may contain UUIDs (e.g., user's goal description)
|
||||
|
||||
Returns:
|
||||
List of unique UUIDs found in the text (lowercase)
|
||||
"""
|
||||
matches = _UUID_PATTERN.findall(text)
|
||||
return list({m.lower() for m in matches})
|
||||
|
||||
|
||||
async def get_library_agent_by_id(
|
||||
user_id: str, agent_id: str
|
||||
) -> LibraryAgentSummary | None:
|
||||
"""Fetch a specific library agent by its ID (library agent ID or graph_id).
|
||||
|
||||
This function tries multiple lookup strategies:
|
||||
1. First tries to find by graph_id (AgentGraph primary key)
|
||||
2. If not found, tries to find by library agent ID (LibraryAgent primary key)
|
||||
|
||||
This handles both cases:
|
||||
- User provides graph_id (e.g., from AgentExecutorBlock)
|
||||
- User provides library agent ID (e.g., from library URL)
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
agent_id: The ID to look up (can be graph_id or library agent ID)
|
||||
|
||||
Returns:
|
||||
LibraryAgentSummary if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
agent = await library_db.get_library_agent_by_graph_id(user_id, agent_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by graph_id: {agent.name}")
|
||||
return LibraryAgentSummary(
|
||||
graph_id=agent.graph_id,
|
||||
graph_version=agent.graph_version,
|
||||
name=agent.name,
|
||||
description=agent.description,
|
||||
input_schema=agent.input_schema,
|
||||
output_schema=agent.output_schema,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not fetch library agent by graph_id {agent_id}: {e}")
|
||||
|
||||
try:
|
||||
agent = await library_db.get_library_agent(agent_id, user_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by library_id: {agent.name}")
|
||||
return LibraryAgentSummary(
|
||||
graph_id=agent.graph_id,
|
||||
graph_version=agent.graph_version,
|
||||
name=agent.name,
|
||||
description=agent.description,
|
||||
input_schema=agent.input_schema,
|
||||
output_schema=agent.output_schema,
|
||||
)
|
||||
except NotFoundError:
|
||||
logger.debug(f"Library agent not found by library_id: {agent_id}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not fetch library agent by library_id {agent_id}: {e}")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# Alias for backward compatibility
|
||||
get_library_agent_by_graph_id = get_library_agent_by_id
|
||||
|
||||
|
||||
async def get_library_agents_for_generation(
|
||||
user_id: str,
|
||||
search_query: str | None = None,
|
||||
exclude_graph_id: str | None = None,
|
||||
max_results: int = 15,
|
||||
) -> list[LibraryAgentSummary]:
|
||||
"""Fetch user's library agents formatted for Agent Generator.
|
||||
|
||||
Uses search-based fetching to return relevant agents instead of all agents.
|
||||
This is more scalable for users with large libraries.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
search_query: Optional search term to find relevant agents (user's goal/description)
|
||||
exclude_graph_id: Optional graph ID to exclude (prevents circular references)
|
||||
max_results: Maximum number of agents to return (default 15)
|
||||
|
||||
Returns:
|
||||
List of LibraryAgentSummary with schemas for sub-agent composition
|
||||
"""
|
||||
try:
|
||||
response = await library_db.list_library_agents(
|
||||
user_id=user_id,
|
||||
search_term=search_query,
|
||||
page=1,
|
||||
page_size=max_results,
|
||||
)
|
||||
|
||||
results: list[LibraryAgentSummary] = []
|
||||
for agent in response.agents:
|
||||
# Exclude the agent being generated/edited to prevent circular references
|
||||
if exclude_graph_id is not None and agent.graph_id == exclude_graph_id:
|
||||
continue
|
||||
|
||||
results.append(
|
||||
LibraryAgentSummary(
|
||||
graph_id=agent.graph_id,
|
||||
graph_version=agent.graph_version,
|
||||
name=agent.name,
|
||||
description=agent.description,
|
||||
input_schema=agent.input_schema,
|
||||
output_schema=agent.output_schema,
|
||||
)
|
||||
)
|
||||
return results
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch library agents: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def search_marketplace_agents_for_generation(
|
||||
search_query: str,
|
||||
max_results: int = 10,
|
||||
) -> list[MarketplaceAgentSummary]:
|
||||
"""Search marketplace agents formatted for Agent Generator.
|
||||
|
||||
Note: This returns basic agent info. Full input/output schemas would require
|
||||
additional graph fetches and is a potential future enhancement.
|
||||
|
||||
Args:
|
||||
search_query: Search term to find relevant public agents
|
||||
max_results: Maximum number of agents to return (default 10)
|
||||
|
||||
Returns:
|
||||
List of MarketplaceAgentSummary (without detailed schemas for now)
|
||||
"""
|
||||
try:
|
||||
response = await store_db.get_store_agents(
|
||||
search_query=search_query,
|
||||
page=1,
|
||||
page_size=max_results,
|
||||
)
|
||||
|
||||
results: list[MarketplaceAgentSummary] = []
|
||||
for agent in response.agents:
|
||||
results.append(
|
||||
MarketplaceAgentSummary(
|
||||
name=agent.agent_name,
|
||||
description=agent.description,
|
||||
sub_heading=agent.sub_heading,
|
||||
creator=agent.creator,
|
||||
is_marketplace_agent=True,
|
||||
)
|
||||
)
|
||||
return results
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to search marketplace agents: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def get_all_relevant_agents_for_generation(
|
||||
user_id: str,
|
||||
search_query: str | None = None,
|
||||
exclude_graph_id: str | None = None,
|
||||
include_marketplace: bool = True,
|
||||
max_library_results: int = 15,
|
||||
max_marketplace_results: int = 10,
|
||||
) -> list[AgentSummary]:
|
||||
"""Fetch relevant agents from library and optionally marketplace.
|
||||
|
||||
Combines search results from user's library and public marketplace,
|
||||
with library agents taking priority (they have full schemas).
|
||||
|
||||
Also extracts UUIDs from the search_query and fetches those agents
|
||||
directly to ensure explicitly referenced agents are included.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
search_query: Search term to find relevant agents (user's goal/description)
|
||||
exclude_graph_id: Optional graph ID to exclude (prevents circular references)
|
||||
include_marketplace: Whether to also search marketplace (default True)
|
||||
max_library_results: Max library agents to return (default 15)
|
||||
max_marketplace_results: Max marketplace agents to return (default 10)
|
||||
|
||||
Returns:
|
||||
List of AgentSummary, library agents first (with full schemas),
|
||||
then marketplace agents (basic info only)
|
||||
"""
|
||||
agents: list[AgentSummary] = []
|
||||
seen_graph_ids: set[str] = set()
|
||||
|
||||
if search_query:
|
||||
mentioned_uuids = extract_uuids_from_text(search_query)
|
||||
for graph_id in mentioned_uuids:
|
||||
if graph_id == exclude_graph_id:
|
||||
continue
|
||||
agent = await get_library_agent_by_graph_id(user_id, graph_id)
|
||||
if agent and agent["graph_id"] not in seen_graph_ids:
|
||||
agents.append(agent)
|
||||
seen_graph_ids.add(agent["graph_id"])
|
||||
logger.debug(f"Found explicitly mentioned agent: {agent['name']}")
|
||||
|
||||
library_agents = await get_library_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=search_query,
|
||||
exclude_graph_id=exclude_graph_id,
|
||||
max_results=max_library_results,
|
||||
)
|
||||
for agent in library_agents:
|
||||
if agent["graph_id"] not in seen_graph_ids:
|
||||
agents.append(agent)
|
||||
seen_graph_ids.add(agent["graph_id"])
|
||||
|
||||
if include_marketplace and search_query:
|
||||
marketplace_agents = await search_marketplace_agents_for_generation(
|
||||
search_query=search_query,
|
||||
max_results=max_marketplace_results,
|
||||
)
|
||||
library_names = {a["name"].lower() for a in agents if a.get("name")}
|
||||
for agent in marketplace_agents:
|
||||
agent_name = agent.get("name")
|
||||
if agent_name and agent_name.lower() not in library_names:
|
||||
agents.append(agent)
|
||||
|
||||
return agents
|
||||
|
||||
|
||||
def extract_search_terms_from_steps(
|
||||
decomposition_result: DecompositionResult | dict[str, Any],
|
||||
) -> list[str]:
|
||||
"""Extract search terms from decomposed instruction steps.
|
||||
|
||||
Analyzes the decomposition result to extract relevant keywords
|
||||
for additional library agent searches.
|
||||
|
||||
Args:
|
||||
decomposition_result: Result from decompose_goal containing steps
|
||||
|
||||
Returns:
|
||||
List of unique search terms extracted from steps
|
||||
"""
|
||||
search_terms: list[str] = []
|
||||
|
||||
if decomposition_result.get("type") != "instructions":
|
||||
return search_terms
|
||||
|
||||
steps = decomposition_result.get("steps", [])
|
||||
if not steps:
|
||||
return search_terms
|
||||
|
||||
step_keys: list[str] = ["description", "action", "block_name", "tool", "name"]
|
||||
|
||||
for step in steps:
|
||||
for key in step_keys:
|
||||
value = step.get(key) # type: ignore[union-attr]
|
||||
if isinstance(value, str) and len(value) > 3:
|
||||
search_terms.append(value)
|
||||
|
||||
seen: set[str] = set()
|
||||
unique_terms: list[str] = []
|
||||
for term in search_terms:
|
||||
term_lower = term.lower()
|
||||
if term_lower not in seen:
|
||||
seen.add(term_lower)
|
||||
unique_terms.append(term)
|
||||
|
||||
return unique_terms
|
||||
|
||||
|
||||
async def enrich_library_agents_from_steps(
|
||||
user_id: str,
|
||||
decomposition_result: DecompositionResult | dict[str, Any],
|
||||
existing_agents: list[AgentSummary] | list[dict[str, Any]],
|
||||
exclude_graph_id: str | None = None,
|
||||
include_marketplace: bool = True,
|
||||
max_additional_results: int = 10,
|
||||
) -> list[AgentSummary] | list[dict[str, Any]]:
|
||||
"""Enrich library agents list with additional searches based on decomposed steps.
|
||||
|
||||
This implements two-phase search: after decomposition, we search for additional
|
||||
relevant agents based on the specific steps identified.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
decomposition_result: Result from decompose_goal containing steps
|
||||
existing_agents: Already fetched library agents from initial search
|
||||
exclude_graph_id: Optional graph ID to exclude
|
||||
include_marketplace: Whether to also search marketplace
|
||||
max_additional_results: Max additional agents per search term (default 10)
|
||||
|
||||
Returns:
|
||||
Combined list of library agents (existing + newly discovered)
|
||||
"""
|
||||
# Extract search terms from steps
|
||||
search_terms = extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
if not search_terms:
|
||||
return existing_agents
|
||||
|
||||
existing_ids: set[str] = set()
|
||||
existing_names: set[str] = set()
|
||||
|
||||
for agent in existing_agents:
|
||||
agent_name = agent.get("name", "")
|
||||
if agent_name:
|
||||
existing_names.add(agent_name.lower())
|
||||
graph_id = agent.get("graph_id") # type: ignore[call-overload]
|
||||
if graph_id:
|
||||
existing_ids.add(graph_id)
|
||||
|
||||
all_agents: list[AgentSummary] | list[dict[str, Any]] = list(existing_agents)
|
||||
|
||||
for term in search_terms[:3]:
|
||||
try:
|
||||
additional_agents = await get_all_relevant_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=term,
|
||||
exclude_graph_id=exclude_graph_id,
|
||||
include_marketplace=include_marketplace,
|
||||
max_library_results=max_additional_results,
|
||||
max_marketplace_results=5,
|
||||
)
|
||||
|
||||
for agent in additional_agents:
|
||||
agent_name = agent.get("name", "")
|
||||
if not agent_name:
|
||||
continue
|
||||
agent_name_lower = agent_name.lower()
|
||||
|
||||
if agent_name_lower in existing_names:
|
||||
continue
|
||||
|
||||
graph_id = agent.get("graph_id") # type: ignore[call-overload]
|
||||
if graph_id and graph_id in existing_ids:
|
||||
continue
|
||||
|
||||
all_agents.append(agent)
|
||||
existing_names.add(agent_name_lower)
|
||||
if graph_id:
|
||||
existing_ids.add(graph_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to search for additional agents with term '{term}': {e}"
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Enriched library agents: {len(existing_agents)} initial + "
|
||||
f"{len(all_agents) - len(existing_agents)} additional = {len(all_agents)} total"
|
||||
)
|
||||
|
||||
return all_agents
|
||||
|
||||
|
||||
async def decompose_goal(
|
||||
description: str,
|
||||
context: str = "",
|
||||
library_agents: list[AgentSummary] | None = None,
|
||||
) -> DecompositionResult | None:
|
||||
async def decompose_goal(description: str, context: str = "") -> dict[str, Any] | None:
|
||||
"""Break down a goal into steps or return clarifying questions.
|
||||
|
||||
Args:
|
||||
description: Natural language goal description
|
||||
context: Additional context (e.g., answers to previous questions)
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
DecompositionResult with either:
|
||||
Dict with either:
|
||||
- {"type": "clarifying_questions", "questions": [...]}
|
||||
- {"type": "instructions", "steps": [...]}
|
||||
Or None on error
|
||||
@@ -485,41 +54,37 @@ async def decompose_goal(
|
||||
"""
|
||||
_check_service_configured()
|
||||
logger.info("Calling external Agent Generator service for decompose_goal")
|
||||
# Convert typed dicts to plain dicts for external service
|
||||
result = await decompose_goal_external(
|
||||
description, context, _to_dict_list(library_agents)
|
||||
)
|
||||
# Cast the result to DecompositionResult (external service returns dict)
|
||||
return result # type: ignore[return-value]
|
||||
return await decompose_goal_external(description, context)
|
||||
|
||||
|
||||
async def generate_agent(
|
||||
instructions: DecompositionResult | dict[str, Any],
|
||||
library_agents: list[AgentSummary] | list[dict[str, Any]] | None = None,
|
||||
instructions: dict[str, Any],
|
||||
operation_id: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Generate agent JSON from instructions.
|
||||
|
||||
Args:
|
||||
instructions: Structured instructions from decompose_goal
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
operation_id: Operation ID for async processing (enables RabbitMQ callback)
|
||||
task_id: Task ID for async processing (enables RabbitMQ callback)
|
||||
|
||||
Returns:
|
||||
Agent JSON dict, error dict {"type": "error", ...}, or None on error
|
||||
Agent JSON dict, {"status": "accepted"} for async, or None on error
|
||||
|
||||
Raises:
|
||||
AgentGeneratorNotConfiguredError: If the external service is not configured.
|
||||
"""
|
||||
_check_service_configured()
|
||||
logger.info("Calling external Agent Generator service for generate_agent")
|
||||
# Convert typed dicts to plain dicts for external service
|
||||
result = await generate_agent_external(
|
||||
dict(instructions), _to_dict_list(library_agents)
|
||||
)
|
||||
result = await generate_agent_external(instructions, operation_id, task_id)
|
||||
|
||||
# Don't modify async response
|
||||
if result and result.get("status") == "accepted":
|
||||
return result
|
||||
|
||||
if result:
|
||||
# Check if it's an error response - pass through as-is
|
||||
if isinstance(result, dict) and result.get("type") == "error":
|
||||
return result
|
||||
# Ensure required fields for successful agent generation
|
||||
# Ensure required fields
|
||||
if "id" not in result:
|
||||
result["id"] = str(uuid.uuid4())
|
||||
if "version" not in result:
|
||||
@@ -605,6 +170,8 @@ async def save_agent_to_library(
|
||||
Returns:
|
||||
Tuple of (created Graph, LibraryAgent)
|
||||
"""
|
||||
from backend.data.graph import get_graph_all_versions
|
||||
|
||||
graph = json_to_graph(agent_json)
|
||||
|
||||
if is_update:
|
||||
@@ -641,28 +208,21 @@ async def save_agent_to_library(
|
||||
|
||||
|
||||
async def get_agent_as_json(
|
||||
agent_id: str, user_id: str | None
|
||||
graph_id: str, user_id: str | None
|
||||
) -> dict[str, Any] | None:
|
||||
"""Fetch an agent and convert to JSON format for editing.
|
||||
|
||||
Args:
|
||||
agent_id: Graph ID or library agent ID
|
||||
graph_id: Graph ID or library agent ID
|
||||
user_id: User ID
|
||||
|
||||
Returns:
|
||||
Agent as JSON dict or None if not found
|
||||
"""
|
||||
graph = await get_graph(agent_id, version=None, user_id=user_id)
|
||||
|
||||
if not graph and user_id:
|
||||
try:
|
||||
library_agent = await library_db.get_library_agent(agent_id, user_id)
|
||||
graph = await get_graph(
|
||||
library_agent.graph_id, version=None, user_id=user_id
|
||||
)
|
||||
except NotFoundError:
|
||||
pass
|
||||
from backend.data.graph import get_graph
|
||||
|
||||
# Try to get the graph (version=None gets the active version)
|
||||
graph = await get_graph(graph_id, version=None, user_id=user_id)
|
||||
if not graph:
|
||||
return None
|
||||
|
||||
@@ -706,7 +266,8 @@ async def get_agent_as_json(
|
||||
async def generate_agent_patch(
|
||||
update_request: str,
|
||||
current_agent: dict[str, Any],
|
||||
library_agents: list[AgentSummary] | None = None,
|
||||
operation_id: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Update an existing agent using natural language.
|
||||
|
||||
@@ -718,18 +279,17 @@ async def generate_agent_patch(
|
||||
Args:
|
||||
update_request: Natural language description of changes
|
||||
current_agent: Current agent JSON
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
operation_id: Operation ID for async processing (enables RabbitMQ callback)
|
||||
task_id: Task ID for async processing (enables RabbitMQ callback)
|
||||
|
||||
Returns:
|
||||
Updated agent JSON, clarifying questions dict {"type": "clarifying_questions", ...},
|
||||
error dict {"type": "error", ...}, or None on unexpected error
|
||||
Updated agent JSON, clarifying questions dict, {"status": "accepted"} for async, or None on error
|
||||
|
||||
Raises:
|
||||
AgentGeneratorNotConfiguredError: If the external service is not configured.
|
||||
"""
|
||||
_check_service_configured()
|
||||
logger.info("Calling external Agent Generator service for generate_agent_patch")
|
||||
# Convert typed dicts to plain dicts for external service
|
||||
return await generate_agent_patch_external(
|
||||
update_request, current_agent, _to_dict_list(library_agents)
|
||||
update_request, current_agent, operation_id, task_id
|
||||
)
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
"""Error handling utilities for agent generator."""
|
||||
|
||||
|
||||
def get_user_message_for_error(
|
||||
error_type: str,
|
||||
operation: str = "process the request",
|
||||
llm_parse_message: str | None = None,
|
||||
validation_message: str | None = None,
|
||||
error_details: str | None = None,
|
||||
) -> str:
|
||||
"""Get a user-friendly error message based on error type.
|
||||
|
||||
This function maps internal error types to user-friendly messages,
|
||||
providing a consistent experience across different agent operations.
|
||||
|
||||
Args:
|
||||
error_type: The error type from the external service
|
||||
(e.g., "llm_parse_error", "timeout", "rate_limit")
|
||||
operation: Description of what operation failed, used in the default
|
||||
message (e.g., "analyze the goal", "generate the agent")
|
||||
llm_parse_message: Custom message for llm_parse_error type
|
||||
validation_message: Custom message for validation_error type
|
||||
error_details: Optional additional details about the error
|
||||
|
||||
Returns:
|
||||
User-friendly error message suitable for display to the user
|
||||
"""
|
||||
base_message = ""
|
||||
|
||||
if error_type == "llm_parse_error":
|
||||
base_message = (
|
||||
llm_parse_message
|
||||
or "The AI had trouble processing this request. Please try again."
|
||||
)
|
||||
elif error_type == "validation_error":
|
||||
base_message = (
|
||||
validation_message
|
||||
or "The generated agent failed validation. "
|
||||
"This usually happens when the agent structure doesn't match "
|
||||
"what the platform expects. Please try simplifying your goal "
|
||||
"or breaking it into smaller parts."
|
||||
)
|
||||
elif error_type == "patch_error":
|
||||
base_message = (
|
||||
"Failed to apply the changes. The modification couldn't be "
|
||||
"validated. Please try a different approach or simplify the change."
|
||||
)
|
||||
elif error_type in ("timeout", "llm_timeout"):
|
||||
base_message = (
|
||||
"The request took too long to process. This can happen with "
|
||||
"complex agents. Please try again or simplify your goal."
|
||||
)
|
||||
elif error_type in ("rate_limit", "llm_rate_limit"):
|
||||
base_message = "The service is currently busy. Please try again in a moment."
|
||||
else:
|
||||
base_message = f"Failed to {operation}. Please try again."
|
||||
|
||||
# Add error details if provided (for debugging, truncated)
|
||||
if error_details:
|
||||
# Truncate long error details
|
||||
details = (
|
||||
error_details[:200] + "..." if len(error_details) > 200 else error_details
|
||||
)
|
||||
base_message += f"\n\nTechnical details: {details}"
|
||||
|
||||
return base_message
|
||||
@@ -14,70 +14,6 @@ from backend.util.settings import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _create_error_response(
|
||||
error_message: str,
|
||||
error_type: str = "unknown",
|
||||
details: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a standardized error response dict.
|
||||
|
||||
Args:
|
||||
error_message: Human-readable error message
|
||||
error_type: Machine-readable error type
|
||||
details: Optional additional error details
|
||||
|
||||
Returns:
|
||||
Error dict with type="error" and error details
|
||||
"""
|
||||
response: dict[str, Any] = {
|
||||
"type": "error",
|
||||
"error": error_message,
|
||||
"error_type": error_type,
|
||||
}
|
||||
if details:
|
||||
response["details"] = details
|
||||
return response
|
||||
|
||||
|
||||
def _classify_http_error(e: httpx.HTTPStatusError) -> tuple[str, str]:
|
||||
"""Classify an HTTP error into error_type and message.
|
||||
|
||||
Args:
|
||||
e: The HTTP status error
|
||||
|
||||
Returns:
|
||||
Tuple of (error_type, error_message)
|
||||
"""
|
||||
status = e.response.status_code
|
||||
if status == 429:
|
||||
return "rate_limit", f"Agent Generator rate limited: {e}"
|
||||
elif status == 503:
|
||||
return "service_unavailable", f"Agent Generator unavailable: {e}"
|
||||
elif status == 504 or status == 408:
|
||||
return "timeout", f"Agent Generator timed out: {e}"
|
||||
else:
|
||||
return "http_error", f"HTTP error calling Agent Generator: {e}"
|
||||
|
||||
|
||||
def _classify_request_error(e: httpx.RequestError) -> tuple[str, str]:
|
||||
"""Classify a request error into error_type and message.
|
||||
|
||||
Args:
|
||||
e: The request error
|
||||
|
||||
Returns:
|
||||
Tuple of (error_type, error_message)
|
||||
"""
|
||||
error_str = str(e).lower()
|
||||
if "timeout" in error_str or "timed out" in error_str:
|
||||
return "timeout", f"Agent Generator request timed out: {e}"
|
||||
elif "connect" in error_str:
|
||||
return "connection_error", f"Could not connect to Agent Generator: {e}"
|
||||
else:
|
||||
return "request_error", f"Request error calling Agent Generator: {e}"
|
||||
|
||||
|
||||
_client: httpx.AsyncClient | None = None
|
||||
_settings: Settings | None = None
|
||||
|
||||
@@ -117,16 +53,13 @@ def _get_client() -> httpx.AsyncClient:
|
||||
|
||||
|
||||
async def decompose_goal_external(
|
||||
description: str,
|
||||
context: str = "",
|
||||
library_agents: list[dict[str, Any]] | None = None,
|
||||
description: str, context: str = ""
|
||||
) -> dict[str, Any] | None:
|
||||
"""Call the external service to decompose a goal.
|
||||
|
||||
Args:
|
||||
description: Natural language goal description
|
||||
context: Additional context (e.g., answers to previous questions)
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Dict with either:
|
||||
@@ -134,8 +67,7 @@ async def decompose_goal_external(
|
||||
- {"type": "instructions", "steps": [...]}
|
||||
- {"type": "unachievable_goal", ...}
|
||||
- {"type": "vague_goal", ...}
|
||||
- {"type": "error", "error": "...", "error_type": "..."} on error
|
||||
Or None on unexpected error
|
||||
Or None on error
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
@@ -144,8 +76,6 @@ async def decompose_goal_external(
|
||||
if context:
|
||||
# The external service uses user_instruction for additional context
|
||||
payload["user_instruction"] = context
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
|
||||
try:
|
||||
response = await client.post("/api/decompose-description", json=payload)
|
||||
@@ -153,13 +83,8 @@ async def decompose_goal_external(
|
||||
data = response.json()
|
||||
|
||||
if not data.get("success"):
|
||||
error_msg = data.get("error", "Unknown error from Agent Generator")
|
||||
error_type = data.get("error_type", "unknown")
|
||||
logger.error(
|
||||
f"Agent Generator decomposition failed: {error_msg} "
|
||||
f"(type: {error_type})"
|
||||
)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"External service returned error: {data.get('error')}")
|
||||
return None
|
||||
|
||||
# Map the response to the expected format
|
||||
response_type = data.get("type")
|
||||
@@ -181,121 +106,122 @@ async def decompose_goal_external(
|
||||
"type": "vague_goal",
|
||||
"suggested_goal": data.get("suggested_goal"),
|
||||
}
|
||||
elif response_type == "error":
|
||||
# Pass through error from the service
|
||||
return _create_error_response(
|
||||
data.get("error", "Unknown error"),
|
||||
data.get("error_type", "unknown"),
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"Unknown response type from external service: {response_type}"
|
||||
)
|
||||
return _create_error_response(
|
||||
f"Unknown response type from Agent Generator: {response_type}",
|
||||
"invalid_response",
|
||||
)
|
||||
return None
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_type, error_msg = _classify_http_error(e)
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"HTTP error calling external agent generator: {e}")
|
||||
return None
|
||||
except httpx.RequestError as e:
|
||||
error_type, error_msg = _classify_request_error(e)
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"Request error calling external agent generator: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error calling Agent Generator: {e}"
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, "unexpected_error")
|
||||
logger.error(f"Unexpected error calling external agent generator: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def generate_agent_external(
|
||||
instructions: dict[str, Any],
|
||||
library_agents: list[dict[str, Any]] | None = None,
|
||||
operation_id: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Call the external service to generate an agent from instructions.
|
||||
|
||||
Args:
|
||||
instructions: Structured instructions from decompose_goal
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
operation_id: Operation ID for async processing (enables RabbitMQ callback)
|
||||
task_id: Task ID for async processing (enables RabbitMQ callback)
|
||||
|
||||
Returns:
|
||||
Agent JSON dict on success, or error dict {"type": "error", ...} on error
|
||||
Agent JSON dict, or {"status": "accepted"} for async, or None on error
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
# Build request payload
|
||||
payload: dict[str, Any] = {"instructions": instructions}
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
if operation_id and task_id:
|
||||
payload["operation_id"] = operation_id
|
||||
payload["task_id"] = task_id
|
||||
|
||||
try:
|
||||
response = await client.post("/api/generate-agent", json=payload)
|
||||
|
||||
# Handle 202 Accepted for async processing
|
||||
if response.status_code == 202:
|
||||
logger.info(
|
||||
f"Agent Generator accepted async request "
|
||||
f"(operation_id={operation_id}, task_id={task_id})"
|
||||
)
|
||||
return {"status": "accepted", "operation_id": operation_id, "task_id": task_id}
|
||||
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if not data.get("success"):
|
||||
error_msg = data.get("error", "Unknown error from Agent Generator")
|
||||
error_type = data.get("error_type", "unknown")
|
||||
logger.error(
|
||||
f"Agent Generator generation failed: {error_msg} "
|
||||
f"(type: {error_type})"
|
||||
)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"External service returned error: {data.get('error')}")
|
||||
return None
|
||||
|
||||
return data.get("agent_json")
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_type, error_msg = _classify_http_error(e)
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"HTTP error calling external agent generator: {e}")
|
||||
return None
|
||||
except httpx.RequestError as e:
|
||||
error_type, error_msg = _classify_request_error(e)
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"Request error calling external agent generator: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error calling Agent Generator: {e}"
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, "unexpected_error")
|
||||
logger.error(f"Unexpected error calling external agent generator: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def generate_agent_patch_external(
|
||||
update_request: str,
|
||||
current_agent: dict[str, Any],
|
||||
library_agents: list[dict[str, Any]] | None = None,
|
||||
operation_id: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Call the external service to generate a patch for an existing agent.
|
||||
|
||||
Args:
|
||||
update_request: Natural language description of changes
|
||||
current_agent: Current agent JSON
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
operation_id: Operation ID for async processing (enables RabbitMQ callback)
|
||||
task_id: Task ID for async processing (enables RabbitMQ callback)
|
||||
|
||||
Returns:
|
||||
Updated agent JSON, clarifying questions dict, or error dict on error
|
||||
Updated agent JSON, clarifying questions dict, {"status": "accepted"} for async, or None on error
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
# Build request payload
|
||||
payload: dict[str, Any] = {
|
||||
"update_request": update_request,
|
||||
"current_agent_json": current_agent,
|
||||
}
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
if operation_id and task_id:
|
||||
payload["operation_id"] = operation_id
|
||||
payload["task_id"] = task_id
|
||||
|
||||
try:
|
||||
response = await client.post("/api/update-agent", json=payload)
|
||||
|
||||
# Handle 202 Accepted for async processing
|
||||
if response.status_code == 202:
|
||||
logger.info(
|
||||
f"Agent Generator accepted async update request "
|
||||
f"(operation_id={operation_id}, task_id={task_id})"
|
||||
)
|
||||
return {"status": "accepted", "operation_id": operation_id, "task_id": task_id}
|
||||
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if not data.get("success"):
|
||||
error_msg = data.get("error", "Unknown error from Agent Generator")
|
||||
error_type = data.get("error_type", "unknown")
|
||||
logger.error(
|
||||
f"Agent Generator patch generation failed: {error_msg} "
|
||||
f"(type: {error_type})"
|
||||
)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"External service returned error: {data.get('error')}")
|
||||
return None
|
||||
|
||||
# Check if it's clarifying questions
|
||||
if data.get("type") == "clarifying_questions":
|
||||
@@ -304,28 +230,18 @@ async def generate_agent_patch_external(
|
||||
"questions": data.get("questions", []),
|
||||
}
|
||||
|
||||
# Check if it's an error passed through
|
||||
if data.get("type") == "error":
|
||||
return _create_error_response(
|
||||
data.get("error", "Unknown error"),
|
||||
data.get("error_type", "unknown"),
|
||||
)
|
||||
|
||||
# Otherwise return the updated agent JSON
|
||||
return data.get("agent_json")
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_type, error_msg = _classify_http_error(e)
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"HTTP error calling external agent generator: {e}")
|
||||
return None
|
||||
except httpx.RequestError as e:
|
||||
error_type, error_msg = _classify_request_error(e)
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
logger.error(f"Request error calling external agent generator: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error calling Agent Generator: {e}"
|
||||
logger.error(error_msg)
|
||||
return _create_error_response(error_msg, "unexpected_error")
|
||||
logger.error(f"Unexpected error calling external agent generator: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def get_blocks_external() -> list[dict[str, Any]] | None:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Shared agent search functionality for find_agent and find_library_agent tools."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Literal
|
||||
|
||||
from backend.api.features.library import db as library_db
|
||||
@@ -20,86 +19,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
SearchSource = Literal["marketplace", "library"]
|
||||
|
||||
# UUID v4 pattern for direct agent ID lookup
|
||||
_UUID_PATTERN = re.compile(
|
||||
r"^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _is_uuid(text: str) -> bool:
|
||||
"""Check if text is a valid UUID v4."""
|
||||
return bool(_UUID_PATTERN.match(text.strip()))
|
||||
|
||||
|
||||
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
|
||||
"""Fetch a library agent by ID (library agent ID or graph_id).
|
||||
|
||||
Tries multiple lookup strategies:
|
||||
1. First by graph_id (AgentGraph primary key)
|
||||
2. Then by library agent ID (LibraryAgent primary key)
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
agent_id: The ID to look up (can be graph_id or library agent ID)
|
||||
|
||||
Returns:
|
||||
AgentInfo if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
agent = await library_db.get_library_agent_by_graph_id(user_id, agent_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by graph_id: {agent.name}")
|
||||
return AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Could not fetch library agent by graph_id {agent_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
try:
|
||||
agent = await library_db.get_library_agent(agent_id, user_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by library_id: {agent.name}")
|
||||
return AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
except NotFoundError:
|
||||
logger.debug(f"Library agent not found by library_id: {agent_id}")
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Could not fetch library agent by library_id {agent_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def search_agents(
|
||||
query: str,
|
||||
@@ -151,38 +70,28 @@ async def search_agents(
|
||||
)
|
||||
)
|
||||
else: # library
|
||||
# If query looks like a UUID, try direct lookup first
|
||||
if _is_uuid(query):
|
||||
logger.info(f"Query looks like UUID, trying direct lookup: {query}")
|
||||
agent = await _get_library_agent_by_id(user_id, query) # type: ignore[arg-type]
|
||||
if agent:
|
||||
agents.append(agent)
|
||||
logger.info(f"Found agent by direct ID lookup: {agent.name}")
|
||||
|
||||
# If no results from UUID lookup, do text search
|
||||
if not agents:
|
||||
logger.info(f"Searching user library for: {query}")
|
||||
results = await library_db.list_library_agents(
|
||||
user_id=user_id, # type: ignore[arg-type]
|
||||
search_term=query,
|
||||
page_size=10,
|
||||
)
|
||||
for agent in results.agents:
|
||||
agents.append(
|
||||
AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
logger.info(f"Searching user library for: {query}")
|
||||
results = await library_db.list_library_agents(
|
||||
user_id=user_id, # type: ignore[arg-type]
|
||||
search_term=query,
|
||||
page_size=10,
|
||||
)
|
||||
for agent in results.agents:
|
||||
agents.append(
|
||||
AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
)
|
||||
logger.info(f"Found {len(agents)} agents in {source}")
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
@@ -8,16 +8,14 @@ from backend.api.features.chat.model import ChatSession
|
||||
from .agent_generator import (
|
||||
AgentGeneratorNotConfiguredError,
|
||||
decompose_goal,
|
||||
enrich_library_agents_from_steps,
|
||||
generate_agent,
|
||||
get_all_relevant_agents_for_generation,
|
||||
get_user_message_for_error,
|
||||
save_agent_to_library,
|
||||
)
|
||||
from .base import BaseTool
|
||||
from .models import (
|
||||
AgentPreviewResponse,
|
||||
AgentSavedResponse,
|
||||
AsyncProcessingResponse,
|
||||
ClarificationNeededResponse,
|
||||
ClarifyingQuestion,
|
||||
ErrorResponse,
|
||||
@@ -98,6 +96,10 @@ class CreateAgentTool(BaseTool):
|
||||
save = kwargs.get("save", True)
|
||||
session_id = session.session_id if session else None
|
||||
|
||||
# Extract async processing params (passed by long-running tool handler)
|
||||
operation_id = kwargs.get("_operation_id")
|
||||
task_id = kwargs.get("_task_id")
|
||||
|
||||
if not description:
|
||||
return ErrorResponse(
|
||||
message="Please provide a description of what the agent should do.",
|
||||
@@ -105,27 +107,9 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Fetch relevant library and marketplace agents for sub-agent composition
|
||||
library_agents = None
|
||||
if user_id:
|
||||
try:
|
||||
library_agents = await get_all_relevant_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=description, # Use goal as search term
|
||||
include_marketplace=True,
|
||||
)
|
||||
logger.debug(
|
||||
f"Found {len(library_agents)} relevant agents for sub-agent composition"
|
||||
)
|
||||
except Exception as e:
|
||||
# Log but don't fail - agent generation can work without sub-agents
|
||||
logger.warning(f"Failed to fetch library agents: {e}")
|
||||
|
||||
# Step 1: Decompose goal into steps
|
||||
try:
|
||||
decomposition_result = await decompose_goal(
|
||||
description, context, library_agents
|
||||
)
|
||||
decomposition_result = await decompose_goal(description, context)
|
||||
except AgentGeneratorNotConfiguredError:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
@@ -138,29 +122,11 @@ class CreateAgentTool(BaseTool):
|
||||
|
||||
if decomposition_result is None:
|
||||
return ErrorResponse(
|
||||
message="Failed to analyze the goal. The agent generation service may be unavailable. Please try again.",
|
||||
message="Failed to analyze the goal. The agent generation service may be unavailable or timed out. Please try again.",
|
||||
error="decomposition_failed",
|
||||
details={"description": description[:100]},
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if the result is an error from the external service
|
||||
if decomposition_result.get("type") == "error":
|
||||
error_msg = decomposition_result.get("error", "Unknown error")
|
||||
error_type = decomposition_result.get("error_type", "unknown")
|
||||
user_message = get_user_message_for_error(
|
||||
error_type,
|
||||
operation="analyze the goal",
|
||||
llm_parse_message="The AI had trouble understanding this request. Please try rephrasing your goal.",
|
||||
)
|
||||
return ErrorResponse(
|
||||
message=user_message,
|
||||
error=f"decomposition_failed:{error_type}",
|
||||
details={
|
||||
"description": description[:100],
|
||||
"service_error": error_msg,
|
||||
"error_type": error_type,
|
||||
},
|
||||
"description": description[:100]
|
||||
}, # Include context for debugging
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -210,26 +176,13 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Step 1.5: Enrich library agents with step-based search (two-phase search)
|
||||
# After decomposition, search for additional relevant agents based on the steps
|
||||
if user_id and library_agents is not None:
|
||||
try:
|
||||
library_agents = await enrich_library_agents_from_steps(
|
||||
user_id=user_id,
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=library_agents,
|
||||
include_marketplace=True,
|
||||
)
|
||||
logger.debug(
|
||||
f"After enrichment: {len(library_agents)} total agents for sub-agent composition"
|
||||
)
|
||||
except Exception as e:
|
||||
# Log but don't fail - continue with existing agents
|
||||
logger.warning(f"Failed to enrich library agents from steps: {e}")
|
||||
|
||||
# Step 2: Generate agent JSON (external service handles fixing and validation)
|
||||
try:
|
||||
agent_json = await generate_agent(decomposition_result, library_agents)
|
||||
agent_json = await generate_agent(
|
||||
decomposition_result,
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
)
|
||||
except AgentGeneratorNotConfiguredError:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
@@ -242,35 +195,24 @@ class CreateAgentTool(BaseTool):
|
||||
|
||||
if agent_json is None:
|
||||
return ErrorResponse(
|
||||
message="Failed to generate the agent. The agent generation service may be unavailable. Please try again.",
|
||||
message="Failed to generate the agent. The agent generation service may be unavailable or timed out. Please try again.",
|
||||
error="generation_failed",
|
||||
details={"description": description[:100]},
|
||||
details={
|
||||
"description": description[:100]
|
||||
}, # Include context for debugging
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if the result is an error from the external service
|
||||
if isinstance(agent_json, dict) and agent_json.get("type") == "error":
|
||||
error_msg = agent_json.get("error", "Unknown error")
|
||||
error_type = agent_json.get("error_type", "unknown")
|
||||
user_message = get_user_message_for_error(
|
||||
error_type,
|
||||
operation="generate the agent",
|
||||
llm_parse_message="The AI had trouble generating the agent. Please try again or simplify your goal.",
|
||||
validation_message=(
|
||||
"I wasn't able to create a valid agent for this request. "
|
||||
"The generated workflow had some structural issues. "
|
||||
"Please try simplifying your goal or breaking it into smaller steps."
|
||||
),
|
||||
error_details=error_msg if error_type == "validation_error" else None,
|
||||
# Check if Agent Generator accepted for async processing
|
||||
if agent_json.get("status") == "accepted":
|
||||
logger.info(
|
||||
f"Agent generation delegated to async processing "
|
||||
f"(operation_id={operation_id}, task_id={task_id})"
|
||||
)
|
||||
return ErrorResponse(
|
||||
message=user_message,
|
||||
error=f"generation_failed:{error_type}",
|
||||
details={
|
||||
"description": description[:100],
|
||||
"service_error": error_msg,
|
||||
"error_type": error_type,
|
||||
},
|
||||
return AsyncProcessingResponse(
|
||||
message="Agent generation started. You'll be notified when it's complete.",
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -312,7 +254,7 @@ class CreateAgentTool(BaseTool):
|
||||
agent_id=created_graph.id,
|
||||
agent_name=created_graph.name,
|
||||
library_agent_id=library_agent.id,
|
||||
library_agent_link=f"/library/agents/{library_agent.id}",
|
||||
library_agent_link=f"/library/{library_agent.id}",
|
||||
agent_page_link=f"/build?flowID={created_graph.id}",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -9,14 +9,13 @@ from .agent_generator import (
|
||||
AgentGeneratorNotConfiguredError,
|
||||
generate_agent_patch,
|
||||
get_agent_as_json,
|
||||
get_all_relevant_agents_for_generation,
|
||||
get_user_message_for_error,
|
||||
save_agent_to_library,
|
||||
)
|
||||
from .base import BaseTool
|
||||
from .models import (
|
||||
AgentPreviewResponse,
|
||||
AgentSavedResponse,
|
||||
AsyncProcessingResponse,
|
||||
ClarificationNeededResponse,
|
||||
ClarifyingQuestion,
|
||||
ErrorResponse,
|
||||
@@ -104,6 +103,10 @@ class EditAgentTool(BaseTool):
|
||||
save = kwargs.get("save", True)
|
||||
session_id = session.session_id if session else None
|
||||
|
||||
# Extract async processing params (passed by long-running tool handler)
|
||||
operation_id = kwargs.get("_operation_id")
|
||||
task_id = kwargs.get("_task_id")
|
||||
|
||||
if not agent_id:
|
||||
return ErrorResponse(
|
||||
message="Please provide the agent ID to edit.",
|
||||
@@ -128,26 +131,6 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Fetch relevant library and marketplace agents for sub-agent composition
|
||||
library_agents = None
|
||||
if user_id:
|
||||
try:
|
||||
# Use the actual graph ID from current_agent to properly exclude
|
||||
# the agent being edited (agent_id might be a library agent ID)
|
||||
exclude_id = current_agent.get("id") or agent_id
|
||||
library_agents = await get_all_relevant_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=changes, # Use changes as search term
|
||||
exclude_graph_id=exclude_id, # Don't include the agent being edited
|
||||
include_marketplace=True,
|
||||
)
|
||||
logger.debug(
|
||||
f"Found {len(library_agents)} relevant agents for sub-agent composition"
|
||||
)
|
||||
except Exception as e:
|
||||
# Log but don't fail - agent editing can work without sub-agents
|
||||
logger.warning(f"Failed to fetch library agents: {e}")
|
||||
|
||||
# Build the update request with context
|
||||
update_request = changes
|
||||
if context:
|
||||
@@ -156,7 +139,10 @@ class EditAgentTool(BaseTool):
|
||||
# Step 2: Generate updated agent (external service handles fixing and validation)
|
||||
try:
|
||||
result = await generate_agent_patch(
|
||||
update_request, current_agent, library_agents
|
||||
update_request,
|
||||
current_agent,
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
)
|
||||
except AgentGeneratorNotConfiguredError:
|
||||
return ErrorResponse(
|
||||
@@ -176,25 +162,16 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if the result is an error from the external service
|
||||
if isinstance(result, dict) and result.get("type") == "error":
|
||||
error_msg = result.get("error", "Unknown error")
|
||||
error_type = result.get("error_type", "unknown")
|
||||
user_message = get_user_message_for_error(
|
||||
error_type,
|
||||
operation="generate the changes",
|
||||
llm_parse_message="The AI had trouble generating the changes. Please try again or simplify your request.",
|
||||
validation_message="The generated changes failed validation. Please try rephrasing your request.",
|
||||
# Check if Agent Generator accepted for async processing
|
||||
if result.get("status") == "accepted":
|
||||
logger.info(
|
||||
f"Agent edit delegated to async processing "
|
||||
f"(operation_id={operation_id}, task_id={task_id})"
|
||||
)
|
||||
return ErrorResponse(
|
||||
message=user_message,
|
||||
error=f"update_generation_failed:{error_type}",
|
||||
details={
|
||||
"agent_id": agent_id,
|
||||
"changes": changes[:100],
|
||||
"service_error": error_msg,
|
||||
"error_type": error_type,
|
||||
},
|
||||
return AsyncProcessingResponse(
|
||||
message="Agent edit started. You'll be notified when it's complete.",
|
||||
operation_id=operation_id,
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -259,7 +236,7 @@ class EditAgentTool(BaseTool):
|
||||
agent_id=created_graph.id,
|
||||
agent_name=created_graph.name,
|
||||
library_agent_id=library_agent.id,
|
||||
library_agent_link=f"/library/agents/{library_agent.id}",
|
||||
library_agent_link=f"/library/{library_agent.id}",
|
||||
agent_page_link=f"/build?flowID={created_graph.id}",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -352,11 +352,15 @@ class OperationStartedResponse(ToolResponseBase):
|
||||
|
||||
This is returned immediately to the client while the operation continues
|
||||
to execute. The user can close the tab and check back later.
|
||||
|
||||
The task_id can be used to reconnect to the SSE stream via
|
||||
GET /chat/tasks/{task_id}/stream?last_idx=0
|
||||
"""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_STARTED
|
||||
operation_id: str
|
||||
tool_name: str
|
||||
task_id: str | None = None # For SSE reconnection
|
||||
|
||||
|
||||
class OperationPendingResponse(ToolResponseBase):
|
||||
@@ -380,3 +384,20 @@ class OperationInProgressResponse(ToolResponseBase):
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_IN_PROGRESS
|
||||
tool_call_id: str
|
||||
|
||||
|
||||
class AsyncProcessingResponse(ToolResponseBase):
|
||||
"""Response when an operation has been delegated to async processing.
|
||||
|
||||
This is returned by tools when the external service accepts the request
|
||||
for async processing (HTTP 202 Accepted). The RabbitMQ completion consumer
|
||||
will handle the result when the external service completes.
|
||||
|
||||
The status field is specifically "accepted" to allow the long-running tool
|
||||
handler to detect this response and skip LLM continuation.
|
||||
"""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_STARTED
|
||||
status: str = "accepted" # Must be "accepted" for detection
|
||||
operation_id: str | None = None
|
||||
task_id: str | None = None
|
||||
|
||||
@@ -77,32 +77,21 @@ async def list_library_agents(
|
||||
}
|
||||
|
||||
# Build search filter if applicable
|
||||
# Split into words and match ANY word in name or description
|
||||
if search_term:
|
||||
words = [w.strip() for w in search_term.split() if len(w.strip()) >= 3]
|
||||
if words:
|
||||
or_conditions: list[prisma.types.LibraryAgentWhereInput] = []
|
||||
for word in words:
|
||||
or_conditions.append(
|
||||
{
|
||||
"AgentGraph": {
|
||||
"is": {"name": {"contains": word, "mode": "insensitive"}}
|
||||
}
|
||||
where_clause["OR"] = [
|
||||
{
|
||||
"AgentGraph": {
|
||||
"is": {"name": {"contains": search_term, "mode": "insensitive"}}
|
||||
}
|
||||
},
|
||||
{
|
||||
"AgentGraph": {
|
||||
"is": {
|
||||
"description": {"contains": search_term, "mode": "insensitive"}
|
||||
}
|
||||
)
|
||||
or_conditions.append(
|
||||
{
|
||||
"AgentGraph": {
|
||||
"is": {
|
||||
"description": {
|
||||
"contains": word,
|
||||
"mode": "insensitive",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
where_clause["OR"] = or_conditions
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
# Determine sorting
|
||||
order_by: prisma.types.LibraryAgentOrderByInput | None = None
|
||||
|
||||
@@ -22,6 +22,10 @@ import backend.api.features.admin.store_admin_routes
|
||||
import backend.api.features.builder
|
||||
import backend.api.features.builder.routes
|
||||
import backend.api.features.chat.routes as chat_routes
|
||||
from backend.api.features.chat.completion_consumer import (
|
||||
start_completion_consumer,
|
||||
stop_completion_consumer,
|
||||
)
|
||||
import backend.api.features.executions.review.routes
|
||||
import backend.api.features.library.db
|
||||
import backend.api.features.library.model
|
||||
@@ -118,9 +122,21 @@ async def lifespan_context(app: fastapi.FastAPI):
|
||||
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
|
||||
await backend.integrations.webhooks.utils.migrate_legacy_triggered_graphs()
|
||||
|
||||
# Start chat completion consumer for RabbitMQ notifications
|
||||
try:
|
||||
await start_completion_consumer()
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not start chat completion consumer: {e}")
|
||||
|
||||
with launch_darkly_context():
|
||||
yield
|
||||
|
||||
# Stop chat completion consumer
|
||||
try:
|
||||
await stop_completion_consumer()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error stopping chat completion consumer: {e}")
|
||||
|
||||
try:
|
||||
await shutdown_cloud_storage_handler()
|
||||
except Exception as e:
|
||||
|
||||
@@ -57,8 +57,7 @@ class TestDecomposeGoal:
|
||||
|
||||
result = await core.decompose_goal("Build a chatbot")
|
||||
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with("Build a chatbot", "", None)
|
||||
mock_external.assert_called_once_with("Build a chatbot", "")
|
||||
assert result == expected_result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -75,8 +74,7 @@ class TestDecomposeGoal:
|
||||
|
||||
await core.decompose_goal("Build a chatbot", "Use Python")
|
||||
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with("Build a chatbot", "Use Python", None)
|
||||
mock_external.assert_called_once_with("Build a chatbot", "Use Python")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_service_failure(self):
|
||||
@@ -111,8 +109,7 @@ class TestGenerateAgent:
|
||||
instructions = {"type": "instructions", "steps": ["Step 1"]}
|
||||
result = await core.generate_agent(instructions)
|
||||
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with(instructions, None)
|
||||
mock_external.assert_called_once_with(instructions)
|
||||
# Result should have id, version, is_active added if not present
|
||||
assert result is not None
|
||||
assert result["name"] == "Test Agent"
|
||||
@@ -177,8 +174,7 @@ class TestGenerateAgentPatch:
|
||||
current_agent = {"nodes": [], "links": []}
|
||||
result = await core.generate_agent_patch("Add a node", current_agent)
|
||||
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with("Add a node", current_agent, None)
|
||||
mock_external.assert_called_once_with("Add a node", current_agent)
|
||||
assert result == expected_result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -1,838 +0,0 @@
|
||||
"""
|
||||
Tests for library agent fetching functionality in agent generator.
|
||||
|
||||
This test suite verifies the search-based library agent fetching,
|
||||
including the combination of library and marketplace agents.
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.agent_generator import core
|
||||
|
||||
|
||||
class TestGetLibraryAgentsForGeneration:
|
||||
"""Test get_library_agents_for_generation function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetches_agents_with_search_term(self):
|
||||
"""Test that search_term is passed to the library db."""
|
||||
# Create a mock agent with proper attribute values
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "agent-123"
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Email Agent"
|
||||
mock_agent.description = "Sends emails"
|
||||
mock_agent.input_schema = {"properties": {}}
|
||||
mock_agent.output_schema = {"properties": {}}
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = [mock_agent]
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
) as mock_list:
|
||||
result = await core.get_library_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="send email",
|
||||
)
|
||||
|
||||
# Verify search_term was passed
|
||||
mock_list.assert_called_once_with(
|
||||
user_id="user-123",
|
||||
search_term="send email",
|
||||
page=1,
|
||||
page_size=15,
|
||||
)
|
||||
|
||||
# Verify result format
|
||||
assert len(result) == 1
|
||||
assert result[0]["graph_id"] == "agent-123"
|
||||
assert result[0]["name"] == "Email Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_excludes_specified_graph_id(self):
|
||||
"""Test that agents with excluded graph_id are filtered out."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = [
|
||||
MagicMock(
|
||||
graph_id="agent-123",
|
||||
graph_version=1,
|
||||
name="Agent 1",
|
||||
description="First agent",
|
||||
input_schema={},
|
||||
output_schema={},
|
||||
),
|
||||
MagicMock(
|
||||
graph_id="agent-456",
|
||||
graph_version=1,
|
||||
name="Agent 2",
|
||||
description="Second agent",
|
||||
input_schema={},
|
||||
output_schema={},
|
||||
),
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
):
|
||||
result = await core.get_library_agents_for_generation(
|
||||
user_id="user-123",
|
||||
exclude_graph_id="agent-123",
|
||||
)
|
||||
|
||||
# Verify the excluded agent is not in results
|
||||
assert len(result) == 1
|
||||
assert result[0]["graph_id"] == "agent-456"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_respects_max_results(self):
|
||||
"""Test that max_results parameter limits the page_size."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = []
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
) as mock_list:
|
||||
await core.get_library_agents_for_generation(
|
||||
user_id="user-123",
|
||||
max_results=5,
|
||||
)
|
||||
|
||||
# Verify page_size was set to max_results
|
||||
mock_list.assert_called_once_with(
|
||||
user_id="user-123",
|
||||
search_term=None,
|
||||
page=1,
|
||||
page_size=5,
|
||||
)
|
||||
|
||||
|
||||
class TestSearchMarketplaceAgentsForGeneration:
|
||||
"""Test search_marketplace_agents_for_generation function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_searches_marketplace_with_query(self):
|
||||
"""Test that marketplace is searched with the query."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = [
|
||||
MagicMock(
|
||||
agent_name="Public Agent",
|
||||
description="A public agent",
|
||||
sub_heading="Does something useful",
|
||||
creator="creator-1",
|
||||
)
|
||||
]
|
||||
|
||||
# The store_db is dynamically imported, so patch the import path
|
||||
with patch(
|
||||
"backend.api.features.store.db.get_store_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
) as mock_search:
|
||||
result = await core.search_marketplace_agents_for_generation(
|
||||
search_query="automation",
|
||||
max_results=10,
|
||||
)
|
||||
|
||||
mock_search.assert_called_once_with(
|
||||
search_query="automation",
|
||||
page=1,
|
||||
page_size=10,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Public Agent"
|
||||
assert result[0]["is_marketplace_agent"] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_marketplace_error_gracefully(self):
|
||||
"""Test that marketplace errors don't crash the function."""
|
||||
with patch(
|
||||
"backend.api.features.store.db.get_store_agents",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Marketplace unavailable"),
|
||||
):
|
||||
result = await core.search_marketplace_agents_for_generation(
|
||||
search_query="test"
|
||||
)
|
||||
|
||||
# Should return empty list, not raise exception
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestGetAllRelevantAgentsForGeneration:
|
||||
"""Test get_all_relevant_agents_for_generation function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_combines_library_and_marketplace_agents(self):
|
||||
"""Test that agents from both sources are combined."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Library Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
marketplace_agents = [
|
||||
{
|
||||
"name": "Market Agent",
|
||||
"description": "From marketplace",
|
||||
"sub_heading": "Sub heading",
|
||||
"creator": "creator-1",
|
||||
"is_marketplace_agent": True,
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=marketplace_agents,
|
||||
):
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="test query",
|
||||
include_marketplace=True,
|
||||
)
|
||||
|
||||
# Library agents should come first
|
||||
assert len(result) == 2
|
||||
assert result[0]["name"] == "Library Agent"
|
||||
assert result[1]["name"] == "Market Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_by_name(self):
|
||||
"""Test that marketplace agents with same name as library are excluded."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Shared Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
marketplace_agents = [
|
||||
{
|
||||
"name": "Shared Agent", # Same name, should be deduplicated
|
||||
"description": "From marketplace",
|
||||
"sub_heading": "Sub heading",
|
||||
"creator": "creator-1",
|
||||
"is_marketplace_agent": True,
|
||||
},
|
||||
{
|
||||
"name": "Unique Agent",
|
||||
"description": "Only in marketplace",
|
||||
"sub_heading": "Sub heading",
|
||||
"creator": "creator-2",
|
||||
"is_marketplace_agent": True,
|
||||
},
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=marketplace_agents,
|
||||
):
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="test",
|
||||
include_marketplace=True,
|
||||
)
|
||||
|
||||
# Shared Agent from marketplace should be excluded
|
||||
assert len(result) == 2
|
||||
names = [a["name"] for a in result]
|
||||
assert "Shared Agent" in names
|
||||
assert "Unique Agent" in names
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_marketplace_when_disabled(self):
|
||||
"""Test that marketplace is not searched when include_marketplace=False."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Library Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_marketplace:
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="test",
|
||||
include_marketplace=False,
|
||||
)
|
||||
|
||||
# Marketplace should not be called
|
||||
mock_marketplace.assert_not_called()
|
||||
assert len(result) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_marketplace_when_no_search_query(self):
|
||||
"""Test that marketplace is not searched without a search query."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Library Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_marketplace:
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query=None, # No search query
|
||||
include_marketplace=True,
|
||||
)
|
||||
|
||||
# Marketplace should not be called without search query
|
||||
mock_marketplace.assert_not_called()
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestExtractSearchTermsFromSteps:
|
||||
"""Test extract_search_terms_from_steps function."""
|
||||
|
||||
def test_extracts_terms_from_instructions_type(self):
|
||||
"""Test extraction from valid instructions decomposition result."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{
|
||||
"description": "Send an email notification",
|
||||
"block_name": "GmailSendBlock",
|
||||
},
|
||||
{"description": "Fetch weather data", "action": "Get weather API"},
|
||||
],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert "Send an email notification" in result
|
||||
assert "GmailSendBlock" in result
|
||||
assert "Fetch weather data" in result
|
||||
assert "Get weather API" in result
|
||||
|
||||
def test_returns_empty_for_non_instructions_type(self):
|
||||
"""Test that non-instructions types return empty list."""
|
||||
decomposition_result = {
|
||||
"type": "clarifying_questions",
|
||||
"questions": [{"question": "What email?"}],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_deduplicates_terms_case_insensitively(self):
|
||||
"""Test that duplicate terms are removed (case-insensitive)."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "Send Email", "name": "send email"},
|
||||
{"description": "Other task"},
|
||||
],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
# Should only have one "send email" variant
|
||||
email_terms = [t for t in result if "email" in t.lower()]
|
||||
assert len(email_terms) == 1
|
||||
|
||||
def test_filters_short_terms(self):
|
||||
"""Test that terms with 3 or fewer characters are filtered out."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "ab", "action": "xyz"}, # Both too short
|
||||
{"description": "Valid term here"},
|
||||
],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert "ab" not in result
|
||||
assert "xyz" not in result
|
||||
assert "Valid term here" in result
|
||||
|
||||
def test_handles_empty_steps(self):
|
||||
"""Test handling of empty steps list."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestEnrichLibraryAgentsFromSteps:
|
||||
"""Test enrich_library_agents_from_steps function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enriches_with_additional_agents(self):
|
||||
"""Test that additional agents are found based on steps."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "existing-123",
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
additional_agents = [
|
||||
{
|
||||
"graph_id": "new-456",
|
||||
"graph_version": 1,
|
||||
"name": "Email Agent",
|
||||
"description": "For sending emails",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "Send email notification"},
|
||||
],
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=additional_agents,
|
||||
):
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should have both existing and new agents
|
||||
assert len(result) == 2
|
||||
names = [a["name"] for a in result]
|
||||
assert "Existing Agent" in names
|
||||
assert "Email Agent" in names
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_by_graph_id(self):
|
||||
"""Test that agents with same graph_id are not duplicated."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "agent-123",
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
# Additional search returns same agent
|
||||
additional_agents = [
|
||||
{
|
||||
"graph_id": "agent-123", # Same ID
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent Copy",
|
||||
"description": "Same agent different name",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [{"description": "Some action"}],
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=additional_agents,
|
||||
):
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should not duplicate
|
||||
assert len(result) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_by_name(self):
|
||||
"""Test that agents with same name are not duplicated."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "agent-123",
|
||||
"graph_version": 1,
|
||||
"name": "Email Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
# Additional search returns agent with same name but different ID
|
||||
additional_agents = [
|
||||
{
|
||||
"graph_id": "agent-456", # Different ID
|
||||
"graph_version": 1,
|
||||
"name": "Email Agent", # Same name
|
||||
"description": "Different agent same name",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [{"description": "Send email"}],
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=additional_agents,
|
||||
):
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should not duplicate by name
|
||||
assert len(result) == 1
|
||||
assert result[0].get("graph_id") == "agent-123" # Original kept
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_existing_when_no_steps(self):
|
||||
"""Test that existing agents are returned when no search terms extracted."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "existing-123",
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "clarifying_questions", # Not instructions type
|
||||
"questions": [],
|
||||
}
|
||||
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should return existing unchanged
|
||||
assert result == existing_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_limits_search_terms_to_three(self):
|
||||
"""Test that only first 3 search terms are used."""
|
||||
existing_agents = []
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "First action"},
|
||||
{"description": "Second action"},
|
||||
{"description": "Third action"},
|
||||
{"description": "Fourth action"},
|
||||
{"description": "Fifth action"},
|
||||
],
|
||||
}
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def mock_get_agents(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return []
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
side_effect=mock_get_agents,
|
||||
):
|
||||
await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should only make 3 calls (limited to first 3 terms)
|
||||
assert call_count == 3
|
||||
|
||||
|
||||
class TestExtractUuidsFromText:
|
||||
"""Test extract_uuids_from_text function."""
|
||||
|
||||
def test_extracts_single_uuid(self):
|
||||
"""Test extraction of a single UUID from text."""
|
||||
text = "Use my agent 46631191-e8a8-486f-ad90-84f89738321d for this task"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert len(result) == 1
|
||||
assert "46631191-e8a8-486f-ad90-84f89738321d" in result
|
||||
|
||||
def test_extracts_multiple_uuids(self):
|
||||
"""Test extraction of multiple UUIDs from text."""
|
||||
text = (
|
||||
"Combine agents 11111111-1111-4111-8111-111111111111 "
|
||||
"and 22222222-2222-4222-9222-222222222222"
|
||||
)
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert len(result) == 2
|
||||
assert "11111111-1111-4111-8111-111111111111" in result
|
||||
assert "22222222-2222-4222-9222-222222222222" in result
|
||||
|
||||
def test_deduplicates_uuids(self):
|
||||
"""Test that duplicate UUIDs are deduplicated."""
|
||||
text = (
|
||||
"Use 46631191-e8a8-486f-ad90-84f89738321d twice: "
|
||||
"46631191-e8a8-486f-ad90-84f89738321d"
|
||||
)
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert len(result) == 1
|
||||
|
||||
def test_normalizes_to_lowercase(self):
|
||||
"""Test that UUIDs are normalized to lowercase."""
|
||||
text = "Use 46631191-E8A8-486F-AD90-84F89738321D"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert result[0] == "46631191-e8a8-486f-ad90-84f89738321d"
|
||||
|
||||
def test_returns_empty_for_no_uuids(self):
|
||||
"""Test that empty list is returned when no UUIDs found."""
|
||||
text = "Create an email agent that sends notifications"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert result == []
|
||||
|
||||
def test_ignores_invalid_uuids(self):
|
||||
"""Test that invalid UUID-like strings are ignored."""
|
||||
text = "Not a valid UUID: 12345678-1234-1234-1234-123456789abc"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
# UUID v4 requires specific patterns (4 in third group, 8/9/a/b in fourth)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestGetLibraryAgentById:
|
||||
"""Test get_library_agent_by_id function (and its alias get_library_agent_by_graph_id)."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_agent_when_found_by_graph_id(self):
|
||||
"""Test that agent is returned when found by graph_id."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "agent-123"
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Test Agent"
|
||||
mock_agent.description = "Test description"
|
||||
mock_agent.input_schema = {"properties": {}}
|
||||
mock_agent.output_schema = {"properties": {}}
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_agent,
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "agent-123")
|
||||
|
||||
assert result is not None
|
||||
assert result["graph_id"] == "agent-123"
|
||||
assert result["name"] == "Test Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_falls_back_to_library_agent_id(self):
|
||||
"""Test that lookup falls back to library agent ID when graph_id not found."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "graph-456" # Different from the lookup ID
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Library Agent"
|
||||
mock_agent.description = "Found by library ID"
|
||||
mock_agent.input_schema = {"properties": {}}
|
||||
mock_agent.output_schema = {"properties": {}}
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None, # Not found by graph_id
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_agent, # Found by library ID
|
||||
),
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "library-id-123")
|
||||
|
||||
assert result is not None
|
||||
assert result["graph_id"] == "graph-456"
|
||||
assert result["name"] == "Library Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_when_not_found_by_either_method(self):
|
||||
"""Test that None is returned when agent not found by either method."""
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=core.NotFoundError("Not found"),
|
||||
),
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "nonexistent")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_exception(self):
|
||||
"""Test that None is returned when exception occurs in both lookups."""
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Database error"),
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Database error"),
|
||||
),
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "agent-123")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alias_works(self):
|
||||
"""Test that get_library_agent_by_graph_id is an alias for get_library_agent_by_id."""
|
||||
assert core.get_library_agent_by_graph_id is core.get_library_agent_by_id
|
||||
|
||||
|
||||
class TestGetAllRelevantAgentsWithUuids:
|
||||
"""Test UUID extraction in get_all_relevant_agents_for_generation."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetches_explicitly_mentioned_agents(self):
|
||||
"""Test that agents mentioned by UUID are fetched directly."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "46631191-e8a8-486f-ad90-84f89738321d"
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Mentioned Agent"
|
||||
mock_agent.description = "Explicitly mentioned"
|
||||
mock_agent.input_schema = {}
|
||||
mock_agent.output_schema = {}
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = []
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_agent,
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
),
|
||||
):
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="Use agent 46631191-e8a8-486f-ad90-84f89738321d",
|
||||
include_marketplace=False,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].get("graph_id") == "46631191-e8a8-486f-ad90-84f89738321d"
|
||||
assert result[0].get("name") == "Mentioned Agent"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -151,20 +151,15 @@ class TestDecomposeGoalExternal:
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_handles_http_error(self):
|
||||
"""Test decomposition handles HTTP errors gracefully."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 500
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.side_effect = httpx.HTTPStatusError(
|
||||
"Server error", request=MagicMock(), response=mock_response
|
||||
"Server error", request=MagicMock(), response=MagicMock()
|
||||
)
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
result = await service.decompose_goal_external("Build a chatbot")
|
||||
|
||||
assert result is not None
|
||||
assert result.get("type") == "error"
|
||||
assert result.get("error_type") == "http_error"
|
||||
assert "Server error" in result.get("error", "")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_handles_request_error(self):
|
||||
@@ -175,10 +170,7 @@ class TestDecomposeGoalExternal:
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
result = await service.decompose_goal_external("Build a chatbot")
|
||||
|
||||
assert result is not None
|
||||
assert result.get("type") == "error"
|
||||
assert result.get("error_type") == "connection_error"
|
||||
assert "Connection failed" in result.get("error", "")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_handles_service_error(self):
|
||||
@@ -187,7 +179,6 @@ class TestDecomposeGoalExternal:
|
||||
mock_response.json.return_value = {
|
||||
"success": False,
|
||||
"error": "Internal error",
|
||||
"error_type": "internal_error",
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
@@ -197,10 +188,7 @@ class TestDecomposeGoalExternal:
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
result = await service.decompose_goal_external("Build a chatbot")
|
||||
|
||||
assert result is not None
|
||||
assert result.get("type") == "error"
|
||||
assert result.get("error") == "Internal error"
|
||||
assert result.get("error_type") == "internal_error"
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestGenerateAgentExternal:
|
||||
@@ -248,10 +236,7 @@ class TestGenerateAgentExternal:
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
result = await service.generate_agent_external({"steps": []})
|
||||
|
||||
assert result is not None
|
||||
assert result.get("type") == "error"
|
||||
assert result.get("error_type") == "connection_error"
|
||||
assert "Connection failed" in result.get("error", "")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestGenerateAgentPatchExternal:
|
||||
@@ -433,139 +418,5 @@ class TestGetBlocksExternal:
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestLibraryAgentsPassthrough:
|
||||
"""Test that library_agents are passed correctly in all requests."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Reset client singleton before each test."""
|
||||
service._settings = None
|
||||
service._client = None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_passes_library_agents(self):
|
||||
"""Test that library_agents are included in decompose goal payload."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "agent-123",
|
||||
"graph_version": 1,
|
||||
"name": "Email Sender",
|
||||
"description": "Sends emails",
|
||||
"input_schema": {"properties": {"to": {"type": "string"}}},
|
||||
"output_schema": {"properties": {"sent": {"type": "boolean"}}},
|
||||
},
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"type": "instructions",
|
||||
"steps": ["Step 1"],
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.decompose_goal_external(
|
||||
"Send an email",
|
||||
library_agents=library_agents,
|
||||
)
|
||||
|
||||
# Verify library_agents was passed in the payload
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args[1]["json"]["library_agents"] == library_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_agent_passes_library_agents(self):
|
||||
"""Test that library_agents are included in generate agent payload."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "agent-456",
|
||||
"graph_version": 2,
|
||||
"name": "Data Fetcher",
|
||||
"description": "Fetches data from API",
|
||||
"input_schema": {"properties": {"url": {"type": "string"}}},
|
||||
"output_schema": {"properties": {"data": {"type": "object"}}},
|
||||
},
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"agent_json": {"name": "Test Agent", "nodes": []},
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.generate_agent_external(
|
||||
{"steps": ["Step 1"]},
|
||||
library_agents=library_agents,
|
||||
)
|
||||
|
||||
# Verify library_agents was passed in the payload
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args[1]["json"]["library_agents"] == library_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_agent_patch_passes_library_agents(self):
|
||||
"""Test that library_agents are included in patch generation payload."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "agent-789",
|
||||
"graph_version": 1,
|
||||
"name": "Slack Notifier",
|
||||
"description": "Sends Slack messages",
|
||||
"input_schema": {"properties": {"message": {"type": "string"}}},
|
||||
"output_schema": {"properties": {"success": {"type": "boolean"}}},
|
||||
},
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"agent_json": {"name": "Updated Agent", "nodes": []},
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.generate_agent_patch_external(
|
||||
"Add error handling",
|
||||
{"name": "Original Agent", "nodes": []},
|
||||
library_agents=library_agents,
|
||||
)
|
||||
|
||||
# Verify library_agents was passed in the payload
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args[1]["json"]["library_agents"] == library_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_without_library_agents(self):
|
||||
"""Test that decompose goal works without library_agents."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"type": "instructions",
|
||||
"steps": ["Step 1"],
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.decompose_goal_external("Build a workflow")
|
||||
|
||||
# Verify library_agents was NOT passed when not provided
|
||||
call_args = mock_client.post.call_args
|
||||
assert "library_agents" not in call_args[1]["json"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
# CLAUDE.md - Frontend
|
||||
|
||||
This file provides guidance to Claude Code when working with the frontend.
|
||||
|
||||
## Essential Commands
|
||||
|
||||
```bash
|
||||
# Install dependencies
|
||||
pnpm i
|
||||
|
||||
# Generate API client from OpenAPI spec
|
||||
pnpm generate:api
|
||||
|
||||
# Start development server
|
||||
pnpm dev
|
||||
|
||||
# Run E2E tests
|
||||
pnpm test
|
||||
|
||||
# Run Storybook for component development
|
||||
pnpm storybook
|
||||
|
||||
# Build production
|
||||
pnpm build
|
||||
|
||||
# Format and lint
|
||||
pnpm format
|
||||
|
||||
# Type checking
|
||||
pnpm types
|
||||
```
|
||||
|
||||
### Code Style
|
||||
|
||||
- Fully capitalize acronyms in symbols, e.g. `graphID`, `useBackendAPI`
|
||||
- Use function declarations (not arrow functions) for components/handlers
|
||||
|
||||
## Architecture
|
||||
|
||||
- **Framework**: Next.js 15 App Router (client-first approach)
|
||||
- **Data Fetching**: Type-safe generated API hooks via Orval + React Query
|
||||
- **State Management**: React Query for server state, co-located UI state in components/hooks
|
||||
- **Component Structure**: Separate render logic (`.tsx`) from business logic (`use*.ts` hooks)
|
||||
- **Workflow Builder**: Visual graph editor using @xyflow/react
|
||||
- **UI Components**: shadcn/ui (Radix UI primitives) with Tailwind CSS styling
|
||||
- **Icons**: Phosphor Icons only
|
||||
- **Feature Flags**: LaunchDarkly integration
|
||||
- **Error Handling**: ErrorCard for render errors, toast for mutations, Sentry for exceptions
|
||||
- **Testing**: Playwright for E2E, Storybook for component development
|
||||
|
||||
## Environment Configuration
|
||||
|
||||
`.env.default` (defaults) → `.env` (user overrides)
|
||||
|
||||
## Feature Development
|
||||
|
||||
See @CONTRIBUTING.md for complete patterns. Quick reference:
|
||||
|
||||
1. **Pages**: Create in `src/app/(platform)/feature-name/page.tsx`
|
||||
- Extract component logic into custom hooks grouped by concern, not by component. Each hook should represent a cohesive domain of functionality (e.g., useSearch, useFilters, usePagination) rather than bundling all state into one useComponentState hook.
|
||||
- Put each hook in its own `.ts` file
|
||||
- Put sub-components in local `components/` folder
|
||||
- Component props should be `type Props = { ... }` (not exported) unless it needs to be used outside the component
|
||||
2. **Components**: Structure as `ComponentName/ComponentName.tsx` + `useComponentName.ts` + `helpers.ts`
|
||||
- Use design system components from `src/components/` (atoms, molecules, organisms)
|
||||
- Never use `src/components/__legacy__/*`
|
||||
3. **Data fetching**: Use generated API hooks from `@/app/api/__generated__/endpoints/`
|
||||
- Regenerate with `pnpm generate:api`
|
||||
- Pattern: `use{Method}{Version}{OperationName}`
|
||||
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
|
||||
5. **Testing**: Add Storybook stories for new components, Playwright for E2E
|
||||
6. **Code conventions**:
|
||||
- Use function declarations (not arrow functions) for components/handlers
|
||||
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
|
||||
- Do not type hook returns, let Typescript infer as much as possible
|
||||
- Never type with `any` unless a variable/attribute can ACTUALLY be of any type
|
||||
@@ -30,9 +30,9 @@ export function getErrorMessage(result: unknown): string {
|
||||
}
|
||||
if (typeof result === "object" && result !== null) {
|
||||
const response = result as Record<string, unknown>;
|
||||
if (response.error) return stripInternalReasoning(String(response.error));
|
||||
if (response.message)
|
||||
return stripInternalReasoning(String(response.message));
|
||||
if (response.error) return stripInternalReasoning(String(response.error));
|
||||
}
|
||||
return "An error occurred";
|
||||
}
|
||||
@@ -363,8 +363,8 @@ export function formatToolResponse(result: unknown, toolName: string): string {
|
||||
|
||||
case "error":
|
||||
const errorMsg =
|
||||
(response.message as string) || response.error || "An error occurred";
|
||||
return String(errorMsg);
|
||||
(response.error as string) || response.message || "An error occurred";
|
||||
return `Error: ${errorMsg}`;
|
||||
|
||||
case "no_results":
|
||||
const suggestions = (response.suggestions as string[]) || [];
|
||||
|
||||
@@ -516,7 +516,7 @@ export type GraphValidationErrorResponse = {
|
||||
|
||||
/* *** LIBRARY *** */
|
||||
|
||||
/* Mirror of backend/api/features/library/model.py:LibraryAgent */
|
||||
/* Mirror of backend/server/v2/library/model.py:LibraryAgent */
|
||||
export type LibraryAgent = {
|
||||
id: LibraryAgentID;
|
||||
graph_id: GraphID;
|
||||
@@ -616,7 +616,7 @@ export enum LibraryAgentSortEnum {
|
||||
|
||||
/* *** CREDENTIALS *** */
|
||||
|
||||
/* Mirror of backend/api/features/integrations/router.py:CredentialsMetaResponse */
|
||||
/* Mirror of backend/server/integrations/router.py:CredentialsMetaResponse */
|
||||
export type CredentialsMetaResponse = {
|
||||
id: string;
|
||||
provider: CredentialsProviderName;
|
||||
@@ -628,13 +628,13 @@ export type CredentialsMetaResponse = {
|
||||
is_system?: boolean;
|
||||
};
|
||||
|
||||
/* Mirror of backend/api/features/integrations/router.py:CredentialsDeletionResponse */
|
||||
/* Mirror of backend/server/integrations/router.py:CredentialsDeletionResponse */
|
||||
export type CredentialsDeleteResponse = {
|
||||
deleted: true;
|
||||
revoked: boolean | null;
|
||||
};
|
||||
|
||||
/* Mirror of backend/api/features/integrations/router.py:CredentialsDeletionNeedsConfirmationResponse */
|
||||
/* Mirror of backend/server/integrations/router.py:CredentialsDeletionNeedsConfirmationResponse */
|
||||
export type CredentialsDeleteNeedConfirmationResponse = {
|
||||
deleted: false;
|
||||
need_confirmation: true;
|
||||
@@ -888,7 +888,7 @@ export type Schedule = {
|
||||
|
||||
export type ScheduleID = Brand<string, "ScheduleID">;
|
||||
|
||||
/* Mirror of backend/api/features/v1.py:ScheduleCreationRequest */
|
||||
/* Mirror of backend/server/routers/v1.py:ScheduleCreationRequest */
|
||||
export type ScheduleCreatable = {
|
||||
graph_id: GraphID;
|
||||
graph_version: number;
|
||||
|
||||
@@ -25,7 +25,7 @@ This document focuses on the **API Integration OAuth flow** used for connecting
|
||||
### 2. Backend API Trust Boundary
|
||||
- **Location**: Server-side FastAPI application
|
||||
- **Components**:
|
||||
- Integration router (`/backend/backend/api/features/integrations/router.py`)
|
||||
- Integration router (`/backend/backend/server/integrations/router.py`)
|
||||
- OAuth handlers (`/backend/backend/integrations/oauth/`)
|
||||
- Credentials store (`/backend/backend/integrations/credentials_store.py`)
|
||||
- **Trust Level**: Trusted - server-controlled environment
|
||||
|
||||
@@ -246,7 +246,7 @@ If you encounter any issues, verify that:
|
||||
```bash
|
||||
ollama pull llama3.2
|
||||
```
|
||||
- If using a custom model, ensure it's added to the model list in `backend/api/model.py`
|
||||
- If using a custom model, ensure it's added to the model list in `backend/server/model.py`
|
||||
|
||||
#### Docker Issues
|
||||
- Ensure Docker daemon is running:
|
||||
|
||||
Reference in New Issue
Block a user