mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
add other specs
This commit is contained in:
802
autogpt_platform/autogpt-rs/DATABASE_MANAGER.md
Normal file
802
autogpt_platform/autogpt-rs/DATABASE_MANAGER.md
Normal file
@@ -0,0 +1,802 @@
|
||||
# DatabaseManager Technical Specification
|
||||
|
||||
## Executive Summary
|
||||
|
||||
This document provides a complete technical specification for implementing a drop-in replacement for the AutoGPT Platform's DatabaseManager service. The replacement must maintain 100% API compatibility while preserving all functional behaviors, security requirements, and performance characteristics.
|
||||
|
||||
## 1. System Overview
|
||||
|
||||
### 1.1 Purpose
|
||||
The DatabaseManager is a centralized service that provides database access for the AutoGPT Platform's executor system. It encapsulates all database operations behind a service interface, enabling distributed execution while maintaining data consistency and security.
|
||||
|
||||
### 1.2 Architecture Pattern
|
||||
- **Service Type**: HTTP-based microservice using FastAPI
|
||||
- **Communication**: RPC-style over HTTP with JSON serialization
|
||||
- **Base Class**: Inherits from `AppService` (backend.util.service)
|
||||
- **Client Classes**: `DatabaseManagerClient` (sync) and `DatabaseManagerAsyncClient` (async)
|
||||
- **Port**: Configurable via `config.database_api_port`
|
||||
|
||||
### 1.3 Critical Requirements
|
||||
1. **API Compatibility**: All 40+ exposed methods must maintain exact signatures
|
||||
2. **Type Safety**: Full type preservation across service boundaries
|
||||
3. **User Isolation**: All operations must respect user_id boundaries
|
||||
4. **Transaction Support**: Maintain ACID properties for critical operations
|
||||
5. **Event Publishing**: Maintain Redis event bus integration for real-time updates
|
||||
|
||||
## 2. Service Implementation Requirements
|
||||
|
||||
### 2.1 Base Service Class
|
||||
|
||||
```python
|
||||
from backend.util.service import AppService, expose
|
||||
from backend.util.settings import Config
|
||||
from backend.data import db
|
||||
import logging
|
||||
|
||||
class DatabaseManager(AppService):
|
||||
"""
|
||||
REQUIRED: Inherit from AppService to get:
|
||||
- Automatic endpoint generation via @expose decorator
|
||||
- Built-in health checks at /health
|
||||
- Request/response serialization
|
||||
- Error handling and logging
|
||||
"""
|
||||
|
||||
def run_service(self) -> None:
|
||||
"""REQUIRED: Initialize database connection before starting service"""
|
||||
logger.info(f"[{self.service_name}] ⏳ Connecting to Database...")
|
||||
self.run_and_wait(db.connect()) # CRITICAL: Must connect to database
|
||||
super().run_service() # Start HTTP server
|
||||
|
||||
def cleanup(self):
|
||||
"""REQUIRED: Clean disconnect on shutdown"""
|
||||
super().cleanup()
|
||||
logger.info(f"[{self.service_name}] ⏳ Disconnecting Database...")
|
||||
self.run_and_wait(db.disconnect()) # CRITICAL: Must disconnect cleanly
|
||||
|
||||
@classmethod
|
||||
def get_port(cls) -> int:
|
||||
"""REQUIRED: Return configured port"""
|
||||
return config.database_api_port
|
||||
```
|
||||
|
||||
### 2.2 Method Exposure Pattern
|
||||
|
||||
```python
|
||||
@staticmethod
|
||||
def _(f: Callable[P, R], name: str | None = None) -> Callable[Concatenate[object, P], R]:
|
||||
"""
|
||||
REQUIRED: Helper to expose methods with proper signatures
|
||||
- Preserves function name for endpoint generation
|
||||
- Maintains type information
|
||||
- Adds 'self' parameter for instance binding
|
||||
"""
|
||||
if name is not None:
|
||||
f.__name__ = name
|
||||
return cast(Callable[Concatenate[object, P], R], expose(f))
|
||||
```
|
||||
|
||||
### 2.3 Database Connection Management
|
||||
|
||||
**REQUIRED: Use Prisma ORM with these exact configurations:**
|
||||
|
||||
```python
|
||||
from prisma import Prisma
|
||||
|
||||
prisma = Prisma(
|
||||
auto_register=True,
|
||||
http={"timeout": HTTP_TIMEOUT}, # Default: 120 seconds
|
||||
datasource={"url": DATABASE_URL}
|
||||
)
|
||||
|
||||
# Connection lifecycle
|
||||
async def connect():
|
||||
await prisma.connect()
|
||||
|
||||
async def disconnect():
|
||||
await prisma.disconnect()
|
||||
```
|
||||
|
||||
### 2.4 Transaction Support
|
||||
|
||||
**REQUIRED: Implement both regular and locked transactions:**
|
||||
|
||||
```python
|
||||
async def transaction(timeout: float | None = None):
|
||||
"""Regular database transaction"""
|
||||
async with prisma.tx(timeout=timeout) as tx:
|
||||
yield tx
|
||||
|
||||
async def locked_transaction(key: str, timeout: float | None = None):
|
||||
"""Transaction with PostgreSQL advisory lock"""
|
||||
lock_key = zlib.crc32(key.encode("utf-8"))
|
||||
async with transaction(timeout=timeout) as tx:
|
||||
await tx.execute_raw("SELECT pg_advisory_xact_lock($1)", lock_key)
|
||||
yield tx
|
||||
```
|
||||
|
||||
## 3. Complete API Specification
|
||||
|
||||
### 3.1 Execution Management APIs
|
||||
|
||||
#### get_graph_execution
|
||||
```python
|
||||
async def get_graph_execution(
|
||||
user_id: str,
|
||||
execution_id: str,
|
||||
*,
|
||||
include_node_executions: bool = False
|
||||
) -> GraphExecution | GraphExecutionWithNodes | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns execution only if user_id matches
|
||||
- Optionally includes all node executions
|
||||
- Returns None if not found or unauthorized
|
||||
|
||||
#### get_graph_executions
|
||||
```python
|
||||
async def get_graph_executions(
|
||||
user_id: str,
|
||||
graph_id: str | None = None,
|
||||
*,
|
||||
limit: int = 50,
|
||||
graph_version: int | None = None,
|
||||
cursor: str | None = None,
|
||||
preset_id: str | None = None
|
||||
) -> tuple[list[GraphExecution], str | None]
|
||||
```
|
||||
**Behavior**:
|
||||
- Paginated results with cursor
|
||||
- Filter by graph_id, version, or preset_id
|
||||
- Returns (executions, next_cursor)
|
||||
|
||||
#### create_graph_execution
|
||||
```python
|
||||
async def create_graph_execution(
|
||||
graph_id: str,
|
||||
graph_version: int,
|
||||
starting_nodes_input: dict[str, dict[str, Any]],
|
||||
user_id: str,
|
||||
preset_id: str | None = None
|
||||
) -> GraphExecutionWithNodes
|
||||
```
|
||||
**Behavior**:
|
||||
- Creates execution with status "QUEUED"
|
||||
- Initializes all nodes with "PENDING" status
|
||||
- Publishes creation event to Redis
|
||||
- Uses locked transaction on graph_id
|
||||
|
||||
#### update_graph_execution_start_time
|
||||
```python
|
||||
async def update_graph_execution_start_time(
|
||||
graph_exec_id: str
|
||||
) -> None
|
||||
```
|
||||
**Behavior**:
|
||||
- Sets start_time to current timestamp
|
||||
- Only updates if currently NULL
|
||||
|
||||
#### update_graph_execution_stats
|
||||
```python
|
||||
async def update_graph_execution_stats(
|
||||
graph_exec_id: str,
|
||||
status: AgentExecutionStatus | None = None,
|
||||
stats: dict[str, Any] | None = None
|
||||
) -> GraphExecution | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Updates status and/or stats atomically
|
||||
- Sets end_time if status is terminal (COMPLETED/FAILED)
|
||||
- Publishes update event to Redis
|
||||
- Returns updated execution
|
||||
|
||||
#### get_node_execution
|
||||
```python
|
||||
async def get_node_execution(
|
||||
node_exec_id: str
|
||||
) -> NodeExecutionResult | None
|
||||
```
|
||||
**Behavior**:
|
||||
- No user_id check (relies on graph execution security)
|
||||
- Includes all input/output data
|
||||
|
||||
#### get_node_executions
|
||||
```python
|
||||
async def get_node_executions(
|
||||
graph_exec_id: str
|
||||
) -> list[NodeExecutionResult]
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns all node executions for graph
|
||||
- Ordered by creation time
|
||||
|
||||
#### get_latest_node_execution
|
||||
```python
|
||||
async def get_latest_node_execution(
|
||||
graph_exec_id: str,
|
||||
node_id: str
|
||||
) -> NodeExecutionResult | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns most recent execution of specific node
|
||||
- Used for retry/rerun scenarios
|
||||
|
||||
#### update_node_execution_status
|
||||
```python
|
||||
async def update_node_execution_status(
|
||||
node_exec_id: str,
|
||||
status: AgentExecutionStatus,
|
||||
execution_data: dict[str, Any] | None = None,
|
||||
stats: dict[str, Any] | None = None
|
||||
) -> NodeExecutionResult
|
||||
```
|
||||
**Behavior**:
|
||||
- Updates status atomically
|
||||
- Sets end_time for terminal states
|
||||
- Optionally updates stats/data
|
||||
- Publishes event to Redis
|
||||
- Returns updated execution
|
||||
|
||||
#### update_node_execution_status_batch
|
||||
```python
|
||||
async def update_node_execution_status_batch(
|
||||
execution_updates: list[NodeExecutionUpdate]
|
||||
) -> list[NodeExecutionResult]
|
||||
```
|
||||
**Behavior**:
|
||||
- Batch update multiple nodes in single transaction
|
||||
- Each update can have different status/stats
|
||||
- Publishes events for all updates
|
||||
- Returns all updated executions
|
||||
|
||||
#### update_node_execution_stats
|
||||
```python
|
||||
async def update_node_execution_stats(
|
||||
node_exec_id: str,
|
||||
stats: dict[str, Any]
|
||||
) -> NodeExecutionResult
|
||||
```
|
||||
**Behavior**:
|
||||
- Updates only stats field
|
||||
- Merges with existing stats
|
||||
- Does not affect status
|
||||
|
||||
#### upsert_execution_input
|
||||
```python
|
||||
async def upsert_execution_input(
|
||||
node_id: str,
|
||||
graph_exec_id: str,
|
||||
input_name: str,
|
||||
input_data: Any,
|
||||
node_exec_id: str | None = None
|
||||
) -> tuple[str, BlockInput]
|
||||
```
|
||||
**Behavior**:
|
||||
- Creates or updates input data
|
||||
- If node_exec_id not provided, creates node execution
|
||||
- Serializes input_data to JSON
|
||||
- Returns (node_exec_id, input_object)
|
||||
|
||||
#### upsert_execution_output
|
||||
```python
|
||||
async def upsert_execution_output(
|
||||
node_exec_id: str,
|
||||
output_name: str,
|
||||
output_data: Any
|
||||
) -> None
|
||||
```
|
||||
**Behavior**:
|
||||
- Creates or updates output data
|
||||
- Serializes output_data to JSON
|
||||
- No return value
|
||||
|
||||
#### get_execution_kv_data
|
||||
```python
|
||||
async def get_execution_kv_data(
|
||||
user_id: str,
|
||||
key: str
|
||||
) -> Any | None
|
||||
```
|
||||
**Behavior**:
|
||||
- User-scoped key-value storage
|
||||
- Returns deserialized JSON data
|
||||
- Returns None if key not found
|
||||
|
||||
#### set_execution_kv_data
|
||||
```python
|
||||
async def set_execution_kv_data(
|
||||
user_id: str,
|
||||
node_exec_id: str,
|
||||
key: str,
|
||||
data: Any
|
||||
) -> Any | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Sets user-scoped key-value data
|
||||
- Associates with node execution
|
||||
- Serializes data to JSON
|
||||
- Returns previous value or None
|
||||
|
||||
#### get_block_error_stats
|
||||
```python
|
||||
async def get_block_error_stats() -> list[BlockErrorStats]
|
||||
```
|
||||
**Behavior**:
|
||||
- Aggregates error counts by block_id
|
||||
- Last 7 days of data
|
||||
- Groups by error type
|
||||
|
||||
### 3.2 Graph Management APIs
|
||||
|
||||
#### get_node
|
||||
```python
|
||||
async def get_node(
|
||||
node_id: str
|
||||
) -> AgentNode | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns node with block data
|
||||
- No user_id check (public blocks)
|
||||
|
||||
#### get_graph
|
||||
```python
|
||||
async def get_graph(
|
||||
graph_id: str,
|
||||
version: int | None = None,
|
||||
user_id: str | None = None,
|
||||
for_export: bool = False,
|
||||
include_subgraphs: bool = False
|
||||
) -> GraphModel | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns latest version if version=None
|
||||
- Checks user_id for private graphs
|
||||
- for_export=True excludes internal fields
|
||||
- include_subgraphs=True loads nested graphs
|
||||
|
||||
#### get_connected_output_nodes
|
||||
```python
|
||||
async def get_connected_output_nodes(
|
||||
node_id: str,
|
||||
output_name: str
|
||||
) -> list[tuple[AgentNode, AgentNodeLink]]
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns downstream nodes connected to output
|
||||
- Includes link metadata
|
||||
- Used for execution flow
|
||||
|
||||
#### get_graph_metadata
|
||||
```python
|
||||
async def get_graph_metadata(
|
||||
graph_id: str,
|
||||
user_id: str
|
||||
) -> GraphMetadata | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns graph metadata without full definition
|
||||
- User must own or have access to graph
|
||||
|
||||
### 3.3 Credit System APIs
|
||||
|
||||
#### get_credits
|
||||
```python
|
||||
async def get_credits(
|
||||
user_id: str
|
||||
) -> int
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns current credit balance
|
||||
- Always non-negative
|
||||
|
||||
#### spend_credits
|
||||
```python
|
||||
async def spend_credits(
|
||||
user_id: str,
|
||||
cost: int,
|
||||
metadata: UsageTransactionMetadata
|
||||
) -> int
|
||||
```
|
||||
**Behavior**:
|
||||
- Deducts credits atomically
|
||||
- Creates transaction record
|
||||
- Throws InsufficientCredits if balance too low
|
||||
- Returns new balance
|
||||
- metadata includes: block_id, node_exec_id, context
|
||||
|
||||
### 3.4 User Management APIs
|
||||
|
||||
#### get_user_metadata
|
||||
```python
|
||||
async def get_user_metadata(
|
||||
user_id: str
|
||||
) -> UserMetadata
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns user preferences and settings
|
||||
- Creates default if not exists
|
||||
|
||||
#### update_user_metadata
|
||||
```python
|
||||
async def update_user_metadata(
|
||||
user_id: str,
|
||||
data: UserMetadataDTO
|
||||
) -> UserMetadata
|
||||
```
|
||||
**Behavior**:
|
||||
- Partial update of metadata
|
||||
- Validates against schema
|
||||
- Returns updated metadata
|
||||
|
||||
#### get_user_integrations
|
||||
```python
|
||||
async def get_user_integrations(
|
||||
user_id: str
|
||||
) -> UserIntegrations
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns OAuth credentials
|
||||
- Decrypts sensitive data
|
||||
- Creates empty if not exists
|
||||
|
||||
#### update_user_integrations
|
||||
```python
|
||||
async def update_user_integrations(
|
||||
user_id: str,
|
||||
data: UserIntegrations
|
||||
) -> None
|
||||
```
|
||||
**Behavior**:
|
||||
- Updates integration credentials
|
||||
- Encrypts sensitive data
|
||||
- No return value
|
||||
|
||||
### 3.5 User Communication APIs
|
||||
|
||||
#### get_active_user_ids_in_timerange
|
||||
```python
|
||||
async def get_active_user_ids_in_timerange(
|
||||
start_time: datetime,
|
||||
end_time: datetime
|
||||
) -> list[str]
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns users with graph executions in range
|
||||
- Used for analytics/notifications
|
||||
|
||||
#### get_user_email_by_id
|
||||
```python
|
||||
async def get_user_email_by_id(
|
||||
user_id: str
|
||||
) -> str | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns user's email address
|
||||
- None if user not found
|
||||
|
||||
#### get_user_email_verification
|
||||
```python
|
||||
async def get_user_email_verification(
|
||||
user_id: str
|
||||
) -> UserEmailVerification
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns email and verification status
|
||||
- Used for notification filtering
|
||||
|
||||
#### get_user_notification_preference
|
||||
```python
|
||||
async def get_user_notification_preference(
|
||||
user_id: str
|
||||
) -> NotificationPreference
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns notification settings
|
||||
- Creates default if not exists
|
||||
|
||||
### 3.6 Notification APIs
|
||||
|
||||
#### create_or_add_to_user_notification_batch
|
||||
```python
|
||||
async def create_or_add_to_user_notification_batch(
|
||||
user_id: str,
|
||||
notification_type: NotificationType,
|
||||
notification_data: NotificationEvent
|
||||
) -> UserNotificationBatchDTO
|
||||
```
|
||||
**Behavior**:
|
||||
- Adds to existing batch or creates new
|
||||
- Batches by type for efficiency
|
||||
- Returns updated batch
|
||||
|
||||
#### empty_user_notification_batch
|
||||
```python
|
||||
async def empty_user_notification_batch(
|
||||
user_id: str,
|
||||
notification_type: NotificationType
|
||||
) -> None
|
||||
```
|
||||
**Behavior**:
|
||||
- Clears all notifications of type
|
||||
- Used after sending batch
|
||||
|
||||
#### get_all_batches_by_type
|
||||
```python
|
||||
async def get_all_batches_by_type(
|
||||
notification_type: NotificationType
|
||||
) -> list[UserNotificationBatchDTO]
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns all user batches of type
|
||||
- Used by notification service
|
||||
|
||||
#### get_user_notification_batch
|
||||
```python
|
||||
async def get_user_notification_batch(
|
||||
user_id: str,
|
||||
notification_type: NotificationType
|
||||
) -> UserNotificationBatchDTO | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns user's batch for type
|
||||
- None if no batch exists
|
||||
|
||||
#### get_user_notification_oldest_message_in_batch
|
||||
```python
|
||||
async def get_user_notification_oldest_message_in_batch(
|
||||
user_id: str,
|
||||
notification_type: NotificationType
|
||||
) -> NotificationEvent | None
|
||||
```
|
||||
**Behavior**:
|
||||
- Returns oldest notification in batch
|
||||
- Used for batch timing decisions
|
||||
|
||||
## 4. Client Implementation Requirements
|
||||
|
||||
### 4.1 Synchronous Client
|
||||
|
||||
```python
|
||||
class DatabaseManagerClient(AppServiceClient):
|
||||
"""
|
||||
REQUIRED: Synchronous client that:
|
||||
- Converts async methods to sync using endpoint_to_sync
|
||||
- Maintains exact method signatures
|
||||
- Handles connection pooling
|
||||
- Implements retry logic
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_service_type(cls):
|
||||
return DatabaseManager
|
||||
|
||||
# Example method mapping
|
||||
get_graph_execution = endpoint_to_sync(DatabaseManager.get_graph_execution)
|
||||
```
|
||||
|
||||
### 4.2 Asynchronous Client
|
||||
|
||||
```python
|
||||
class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
"""
|
||||
REQUIRED: Async client that:
|
||||
- Directly references async methods
|
||||
- No conversion needed
|
||||
- Shares connection pool
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_service_type(cls):
|
||||
return DatabaseManager
|
||||
|
||||
# Direct method reference
|
||||
get_graph_execution = DatabaseManager.get_graph_execution
|
||||
```
|
||||
|
||||
## 5. Data Models
|
||||
|
||||
### 5.1 Core Enums
|
||||
|
||||
```python
|
||||
class AgentExecutionStatus(str, Enum):
|
||||
PENDING = "PENDING"
|
||||
QUEUED = "QUEUED"
|
||||
RUNNING = "RUNNING"
|
||||
COMPLETED = "COMPLETED"
|
||||
FAILED = "FAILED"
|
||||
CANCELED = "CANCELED"
|
||||
|
||||
class NotificationType(str, Enum):
|
||||
SYSTEM = "SYSTEM"
|
||||
REVIEW = "REVIEW"
|
||||
EXECUTION = "EXECUTION"
|
||||
MARKETING = "MARKETING"
|
||||
```
|
||||
|
||||
### 5.2 Key Data Models
|
||||
|
||||
All models must exactly match the Prisma schema definitions. Key models include:
|
||||
|
||||
- `GraphExecution`: Execution metadata with stats
|
||||
- `GraphExecutionWithNodes`: Includes all node executions
|
||||
- `NodeExecutionResult`: Node execution with I/O data
|
||||
- `GraphModel`: Complete graph definition
|
||||
- `UserIntegrations`: OAuth credentials
|
||||
- `UsageTransactionMetadata`: Credit usage context
|
||||
- `NotificationEvent`: Individual notification data
|
||||
|
||||
## 6. Security Requirements
|
||||
|
||||
### 6.1 User Isolation
|
||||
- **CRITICAL**: All user-scoped operations MUST filter by user_id
|
||||
- Never expose data across user boundaries
|
||||
- Use database-level row security where possible
|
||||
|
||||
### 6.2 Authentication
|
||||
- Service assumes authentication handled by API gateway
|
||||
- user_id parameter is trusted after authentication
|
||||
- No additional auth checks within service
|
||||
|
||||
### 6.3 Data Protection
|
||||
- Encrypt sensitive integration credentials
|
||||
- Use HMAC for unsubscribe tokens
|
||||
- Never log sensitive data
|
||||
|
||||
## 7. Performance Requirements
|
||||
|
||||
### 7.1 Connection Management
|
||||
- Maintain persistent database connection
|
||||
- Use connection pooling (default: 10 connections)
|
||||
- Implement exponential backoff for retries
|
||||
|
||||
### 7.2 Query Optimization
|
||||
- Use indexes for all WHERE clauses
|
||||
- Batch operations where possible
|
||||
- Limit default result sets (50 items)
|
||||
|
||||
### 7.3 Event Publishing
|
||||
- Publish events asynchronously
|
||||
- Don't block on event delivery
|
||||
- Use fire-and-forget pattern
|
||||
|
||||
## 8. Error Handling
|
||||
|
||||
### 8.1 Standard Exceptions
|
||||
```python
|
||||
class InsufficientCredits(Exception):
|
||||
"""Raised when user lacks credits"""
|
||||
|
||||
class NotFoundError(Exception):
|
||||
"""Raised when entity not found"""
|
||||
|
||||
class AuthorizationError(Exception):
|
||||
"""Raised when user lacks access"""
|
||||
```
|
||||
|
||||
### 8.2 Error Response Format
|
||||
```json
|
||||
{
|
||||
"error": "error_type",
|
||||
"message": "Human readable message",
|
||||
"details": {} // Optional additional context
|
||||
}
|
||||
```
|
||||
|
||||
## 9. Testing Requirements
|
||||
|
||||
### 9.1 Unit Tests
|
||||
- Test each method in isolation
|
||||
- Mock database calls
|
||||
- Verify user_id filtering
|
||||
|
||||
### 9.2 Integration Tests
|
||||
- Test with real database
|
||||
- Verify transaction boundaries
|
||||
- Test concurrent operations
|
||||
|
||||
### 9.3 Service Tests
|
||||
- Test HTTP endpoint generation
|
||||
- Verify serialization/deserialization
|
||||
- Test error handling
|
||||
|
||||
## 10. Implementation Checklist
|
||||
|
||||
### Phase 1: Core Service Setup
|
||||
- [ ] Create DatabaseManager class inheriting from AppService
|
||||
- [ ] Implement run_service() with database connection
|
||||
- [ ] Implement cleanup() with proper disconnect
|
||||
- [ ] Configure port from settings
|
||||
- [ ] Set up method exposure helper
|
||||
|
||||
### Phase 2: Execution APIs (15 methods)
|
||||
- [ ] get_graph_execution
|
||||
- [ ] get_graph_executions
|
||||
- [ ] get_graph_execution_meta
|
||||
- [ ] create_graph_execution
|
||||
- [ ] update_graph_execution_start_time
|
||||
- [ ] update_graph_execution_stats
|
||||
- [ ] get_node_execution
|
||||
- [ ] get_node_executions
|
||||
- [ ] get_latest_node_execution
|
||||
- [ ] update_node_execution_status
|
||||
- [ ] update_node_execution_status_batch
|
||||
- [ ] update_node_execution_stats
|
||||
- [ ] upsert_execution_input
|
||||
- [ ] upsert_execution_output
|
||||
- [ ] get_execution_kv_data
|
||||
- [ ] set_execution_kv_data
|
||||
- [ ] get_block_error_stats
|
||||
|
||||
### Phase 3: Graph APIs (4 methods)
|
||||
- [ ] get_node
|
||||
- [ ] get_graph
|
||||
- [ ] get_connected_output_nodes
|
||||
- [ ] get_graph_metadata
|
||||
|
||||
### Phase 4: Credit APIs (2 methods)
|
||||
- [ ] get_credits
|
||||
- [ ] spend_credits
|
||||
|
||||
### Phase 5: User APIs (4 methods)
|
||||
- [ ] get_user_metadata
|
||||
- [ ] update_user_metadata
|
||||
- [ ] get_user_integrations
|
||||
- [ ] update_user_integrations
|
||||
|
||||
### Phase 6: Communication APIs (4 methods)
|
||||
- [ ] get_active_user_ids_in_timerange
|
||||
- [ ] get_user_email_by_id
|
||||
- [ ] get_user_email_verification
|
||||
- [ ] get_user_notification_preference
|
||||
|
||||
### Phase 7: Notification APIs (5 methods)
|
||||
- [ ] create_or_add_to_user_notification_batch
|
||||
- [ ] empty_user_notification_batch
|
||||
- [ ] get_all_batches_by_type
|
||||
- [ ] get_user_notification_batch
|
||||
- [ ] get_user_notification_oldest_message_in_batch
|
||||
|
||||
### Phase 8: Client Implementation
|
||||
- [ ] Create DatabaseManagerClient with sync methods
|
||||
- [ ] Create DatabaseManagerAsyncClient with async methods
|
||||
- [ ] Test client method generation
|
||||
- [ ] Verify type preservation
|
||||
|
||||
### Phase 9: Integration Testing
|
||||
- [ ] Test all methods with real database
|
||||
- [ ] Verify user isolation
|
||||
- [ ] Test error scenarios
|
||||
- [ ] Performance testing
|
||||
- [ ] Event publishing verification
|
||||
|
||||
### Phase 10: Deployment Validation
|
||||
- [ ] Deploy to test environment
|
||||
- [ ] Run integration test suite
|
||||
- [ ] Verify backward compatibility
|
||||
- [ ] Performance benchmarking
|
||||
- [ ] Production deployment
|
||||
|
||||
## 11. Success Criteria
|
||||
|
||||
The implementation is successful when:
|
||||
|
||||
1. **All 40+ methods** produce identical outputs to the original
|
||||
2. **Performance** is within 10% of original implementation
|
||||
3. **All tests** pass without modification
|
||||
4. **No breaking changes** to any client code
|
||||
5. **Security boundaries** are maintained
|
||||
6. **Event publishing** works identically
|
||||
7. **Error handling** matches original behavior
|
||||
|
||||
## 12. Critical Implementation Notes
|
||||
|
||||
1. **DO NOT** modify any function signatures
|
||||
2. **DO NOT** change any return types
|
||||
3. **DO NOT** add new required parameters
|
||||
4. **DO NOT** remove any functionality
|
||||
5. **ALWAYS** maintain user_id isolation
|
||||
6. **ALWAYS** publish events for state changes
|
||||
7. **ALWAYS** use transactions for multi-step operations
|
||||
8. **ALWAYS** handle errors exactly as original
|
||||
|
||||
This specification, when implemented correctly, will produce a drop-in replacement for the DatabaseManager that maintains 100% compatibility with the existing system.
|
||||
474
autogpt_platform/autogpt-rs/SCHEDULER.md
Normal file
474
autogpt_platform/autogpt-rs/SCHEDULER.md
Normal file
@@ -0,0 +1,474 @@
|
||||
# AutoGPT Platform Scheduler Technical Specification
|
||||
|
||||
## Executive Summary
|
||||
|
||||
This document provides a comprehensive technical specification for the AutoGPT Platform Scheduler service. The scheduler is responsible for managing scheduled graph executions, system monitoring tasks, and periodic maintenance operations. This specification is designed to enable a complete reimplementation that maintains 100% compatibility with the existing system.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
1. [System Architecture](#system-architecture)
|
||||
2. [Service Implementation](#service-implementation)
|
||||
3. [Data Models](#data-models)
|
||||
4. [API Endpoints](#api-endpoints)
|
||||
5. [Database Schema](#database-schema)
|
||||
6. [External Dependencies](#external-dependencies)
|
||||
7. [Authentication & Authorization](#authentication--authorization)
|
||||
8. [Process Management](#process-management)
|
||||
9. [Error Handling](#error-handling)
|
||||
10. [Configuration](#configuration)
|
||||
11. [Testing Strategy](#testing-strategy)
|
||||
|
||||
## System Architecture
|
||||
|
||||
### Overview
|
||||
|
||||
The scheduler operates as an independent microservice within the AutoGPT platform, implementing the `AppService` base class pattern. It runs on a dedicated port (default: 8003) and exposes HTTP/JSON-RPC endpoints for communication with other services.
|
||||
|
||||
### Core Components
|
||||
|
||||
1. **Scheduler Service** (`backend/executor/scheduler.py:156`)
|
||||
- Extends `AppService` base class
|
||||
- Manages APScheduler instance with multiple jobstores
|
||||
- Handles lifecycle management and graceful shutdown
|
||||
|
||||
2. **Scheduler Client** (`backend/executor/scheduler.py:354`)
|
||||
- Extends `AppServiceClient` base class
|
||||
- Provides async/sync method wrappers for RPC calls
|
||||
- Implements automatic retry and connection pooling
|
||||
|
||||
3. **Entry Points**
|
||||
- Main executable: `backend/scheduler.py`
|
||||
- Service launcher: `backend/app.py`
|
||||
|
||||
## Service Implementation
|
||||
|
||||
### Base Service Pattern
|
||||
|
||||
```python
|
||||
class Scheduler(AppService):
|
||||
scheduler: BlockingScheduler
|
||||
|
||||
def __init__(self, register_system_tasks: bool = True):
|
||||
self.register_system_tasks = register_system_tasks
|
||||
|
||||
@classmethod
|
||||
def get_port(cls) -> int:
|
||||
return config.execution_scheduler_port # Default: 8003
|
||||
|
||||
@classmethod
|
||||
def db_pool_size(cls) -> int:
|
||||
return config.scheduler_db_pool_size # Default: 3
|
||||
|
||||
def run_service(self):
|
||||
# Initialize scheduler with jobstores
|
||||
# Register system tasks if enabled
|
||||
# Start scheduler blocking loop
|
||||
|
||||
def cleanup(self):
|
||||
# Graceful shutdown of scheduler
|
||||
# Wait=False for immediate termination
|
||||
```
|
||||
|
||||
### Jobstore Configuration
|
||||
|
||||
The scheduler uses three distinct jobstores:
|
||||
|
||||
1. **EXECUTION** (`Jobstores.EXECUTION.value`)
|
||||
- Type: SQLAlchemyJobStore
|
||||
- Table: `apscheduler_jobs`
|
||||
- Purpose: Graph execution schedules
|
||||
- Persistence: Required
|
||||
|
||||
2. **BATCHED_NOTIFICATIONS** (`Jobstores.BATCHED_NOTIFICATIONS.value`)
|
||||
- Type: SQLAlchemyJobStore
|
||||
- Table: `apscheduler_jobs_batched_notifications`
|
||||
- Purpose: Batched notification processing
|
||||
- Persistence: Required
|
||||
|
||||
3. **WEEKLY_NOTIFICATIONS** (`Jobstores.WEEKLY_NOTIFICATIONS.value`)
|
||||
- Type: MemoryJobStore
|
||||
- Purpose: Weekly summary notifications
|
||||
- Persistence: Not required
|
||||
|
||||
### System Tasks
|
||||
|
||||
When `register_system_tasks=True`, the following monitoring tasks are registered:
|
||||
|
||||
1. **Weekly Summary Processing**
|
||||
- Job ID: `process_weekly_summary`
|
||||
- Schedule: `0 * * * *` (hourly)
|
||||
- Function: `monitoring.process_weekly_summary`
|
||||
- Jobstore: WEEKLY_NOTIFICATIONS
|
||||
|
||||
2. **Late Execution Monitoring**
|
||||
- Job ID: `report_late_executions`
|
||||
- Schedule: Interval (config.execution_late_notification_threshold_secs)
|
||||
- Function: `monitoring.report_late_executions`
|
||||
- Jobstore: EXECUTION
|
||||
|
||||
3. **Block Error Rate Monitoring**
|
||||
- Job ID: `report_block_error_rates`
|
||||
- Schedule: Interval (config.block_error_rate_check_interval_secs)
|
||||
- Function: `monitoring.report_block_error_rates`
|
||||
- Jobstore: EXECUTION
|
||||
|
||||
4. **Cloud Storage Cleanup**
|
||||
- Job ID: `cleanup_expired_files`
|
||||
- Schedule: Interval (config.cloud_storage_cleanup_interval_hours * 3600)
|
||||
- Function: `cleanup_expired_files`
|
||||
- Jobstore: EXECUTION
|
||||
|
||||
## Data Models
|
||||
|
||||
### GraphExecutionJobArgs
|
||||
|
||||
```python
|
||||
class GraphExecutionJobArgs(BaseModel):
|
||||
user_id: str
|
||||
graph_id: str
|
||||
graph_version: int
|
||||
cron: str
|
||||
input_data: BlockInput
|
||||
input_credentials: dict[str, CredentialsMetaInput] = Field(default_factory=dict)
|
||||
```
|
||||
|
||||
### GraphExecutionJobInfo
|
||||
|
||||
```python
|
||||
class GraphExecutionJobInfo(GraphExecutionJobArgs):
|
||||
id: str
|
||||
name: str
|
||||
next_run_time: str
|
||||
|
||||
@staticmethod
|
||||
def from_db(job_args: GraphExecutionJobArgs, job_obj: JobObj) -> "GraphExecutionJobInfo":
|
||||
return GraphExecutionJobInfo(
|
||||
id=job_obj.id,
|
||||
name=job_obj.name,
|
||||
next_run_time=job_obj.next_run_time.isoformat(),
|
||||
**job_args.model_dump(),
|
||||
)
|
||||
```
|
||||
|
||||
### NotificationJobArgs
|
||||
|
||||
```python
|
||||
class NotificationJobArgs(BaseModel):
|
||||
notification_types: list[NotificationType]
|
||||
cron: str
|
||||
```
|
||||
|
||||
### CredentialsMetaInput
|
||||
|
||||
```python
|
||||
class CredentialsMetaInput(BaseModel, Generic[CP, CT]):
|
||||
id: str
|
||||
title: Optional[str] = None
|
||||
provider: CP
|
||||
type: CT
|
||||
```
|
||||
|
||||
## API Endpoints
|
||||
|
||||
All endpoints are exposed via the `@expose` decorator and follow HTTP POST JSON-RPC pattern.
|
||||
|
||||
### 1. Add Graph Execution Schedule
|
||||
|
||||
**Endpoint**: `/add_graph_execution_schedule`
|
||||
|
||||
**Request Body**:
|
||||
```json
|
||||
{
|
||||
"user_id": "string",
|
||||
"graph_id": "string",
|
||||
"graph_version": "integer",
|
||||
"cron": "string (crontab format)",
|
||||
"input_data": {},
|
||||
"input_credentials": {},
|
||||
"name": "string (optional)"
|
||||
}
|
||||
```
|
||||
|
||||
**Response**: `GraphExecutionJobInfo`
|
||||
|
||||
**Behavior**:
|
||||
- Creates APScheduler job with CronTrigger
|
||||
- Uses job kwargs to store GraphExecutionJobArgs
|
||||
- Sets `replace_existing=True` to allow updates
|
||||
- Returns job info with generated ID and next run time
|
||||
|
||||
### 2. Delete Graph Execution Schedule
|
||||
|
||||
**Endpoint**: `/delete_graph_execution_schedule`
|
||||
|
||||
**Request Body**:
|
||||
```json
|
||||
{
|
||||
"schedule_id": "string",
|
||||
"user_id": "string"
|
||||
}
|
||||
```
|
||||
|
||||
**Response**: `GraphExecutionJobInfo`
|
||||
|
||||
**Behavior**:
|
||||
- Validates schedule exists in EXECUTION jobstore
|
||||
- Verifies user_id matches job's user_id
|
||||
- Removes job from scheduler
|
||||
- Returns deleted job info
|
||||
|
||||
**Errors**:
|
||||
- `NotFoundError`: If job doesn't exist
|
||||
- `NotAuthorizedError`: If user_id doesn't match
|
||||
|
||||
### 3. Get Graph Execution Schedules
|
||||
|
||||
**Endpoint**: `/get_graph_execution_schedules`
|
||||
|
||||
**Request Body**:
|
||||
```json
|
||||
{
|
||||
"graph_id": "string (optional)",
|
||||
"user_id": "string (optional)"
|
||||
}
|
||||
```
|
||||
|
||||
**Response**: `list[GraphExecutionJobInfo]`
|
||||
|
||||
**Behavior**:
|
||||
- Retrieves all jobs from EXECUTION jobstore
|
||||
- Filters by graph_id and/or user_id if provided
|
||||
- Validates job kwargs as GraphExecutionJobArgs
|
||||
- Skips invalid jobs (ValidationError)
|
||||
- Only returns jobs with next_run_time set
|
||||
|
||||
### 4. System Task Endpoints
|
||||
|
||||
- `/execute_process_existing_batches` - Trigger batch processing
|
||||
- `/execute_process_weekly_summary` - Trigger weekly summary
|
||||
- `/execute_report_late_executions` - Trigger late execution report
|
||||
- `/execute_report_block_error_rates` - Trigger error rate report
|
||||
- `/execute_cleanup_expired_files` - Trigger file cleanup
|
||||
|
||||
### 5. Health Check
|
||||
|
||||
**Endpoints**: `/health_check`, `/health_check_async`
|
||||
**Methods**: POST, GET
|
||||
**Response**: "OK"
|
||||
|
||||
## Database Schema
|
||||
|
||||
### APScheduler Tables
|
||||
|
||||
The scheduler relies on APScheduler's SQLAlchemy jobstore schema:
|
||||
|
||||
1. **apscheduler_jobs**
|
||||
- id: VARCHAR (PRIMARY KEY)
|
||||
- next_run_time: FLOAT
|
||||
- job_state: BLOB/BYTEA (pickled job data)
|
||||
|
||||
2. **apscheduler_jobs_batched_notifications**
|
||||
- Same schema as above
|
||||
- Separate table for notification jobs
|
||||
|
||||
### Database Configuration
|
||||
|
||||
- URL extraction from `DIRECT_URL` environment variable
|
||||
- Schema extraction from URL query parameter
|
||||
- Connection pooling: `pool_size=db_pool_size()`, `max_overflow=0`
|
||||
- Metadata schema binding for multi-schema support
|
||||
|
||||
## External Dependencies
|
||||
|
||||
### Required Services
|
||||
|
||||
1. **PostgreSQL Database**
|
||||
- Connection via `DIRECT_URL` environment variable
|
||||
- Schema support via URL parameter
|
||||
- APScheduler job persistence
|
||||
|
||||
2. **ExecutionManager** (via execution_utils)
|
||||
- Function: `add_graph_execution`
|
||||
- Called by: `execute_graph` job function
|
||||
- Purpose: Create graph execution entries
|
||||
|
||||
3. **NotificationManager** (via monitoring module)
|
||||
- Functions: `process_existing_batches`, `queue_weekly_summary`
|
||||
- Purpose: Notification processing
|
||||
|
||||
4. **Cloud Storage** (via util.cloud_storage)
|
||||
- Function: `cleanup_expired_files_async`
|
||||
- Purpose: File expiration management
|
||||
|
||||
### Python Dependencies
|
||||
|
||||
```
|
||||
apscheduler>=3.10.0
|
||||
sqlalchemy
|
||||
pydantic>=2.0
|
||||
httpx
|
||||
uvicorn
|
||||
fastapi
|
||||
python-dotenv
|
||||
tenacity
|
||||
```
|
||||
|
||||
## Authentication & Authorization
|
||||
|
||||
### Service-Level Authentication
|
||||
|
||||
- No authentication required between internal services
|
||||
- Services communicate via trusted internal network
|
||||
- Host/port configuration via environment variables
|
||||
|
||||
### User-Level Authorization
|
||||
|
||||
- Authorization check in `delete_graph_execution_schedule`:
|
||||
- Validates `user_id` matches job's `user_id`
|
||||
- Raises `NotAuthorizedError` on mismatch
|
||||
- No authorization for read operations (security consideration)
|
||||
|
||||
## Process Management
|
||||
|
||||
### Startup Sequence
|
||||
|
||||
1. Load environment variables via `dotenv.load_dotenv()`
|
||||
2. Extract database URL and schema
|
||||
3. Initialize BlockingScheduler with configured jobstores
|
||||
4. Register system tasks (if enabled)
|
||||
5. Add job execution listener
|
||||
6. Start scheduler (blocking)
|
||||
|
||||
### Shutdown Sequence
|
||||
|
||||
1. Receive SIGTERM/SIGINT signal
|
||||
2. Call `cleanup()` method
|
||||
3. Shutdown scheduler with `wait=False`
|
||||
4. Terminate process
|
||||
|
||||
### Multi-Process Architecture
|
||||
|
||||
- Runs as independent process via `AppProcess`
|
||||
- Started by `run_processes()` in app.py
|
||||
- Can run in foreground or background mode
|
||||
- Automatic signal handling for graceful shutdown
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Job Execution Errors
|
||||
|
||||
- Listener on `EVENT_JOB_ERROR` logs failures
|
||||
- Errors in job functions are caught and logged
|
||||
- Jobs continue to run on schedule despite failures
|
||||
|
||||
### RPC Communication Errors
|
||||
|
||||
- Automatic retry via `@conn_retry` decorator
|
||||
- Configurable retry count and timeout
|
||||
- Connection pooling with self-healing
|
||||
|
||||
### Database Connection Errors
|
||||
|
||||
- APScheduler handles reconnection automatically
|
||||
- Pool exhaustion prevented by `max_overflow=0`
|
||||
- Connection errors logged but don't crash service
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment Variables
|
||||
|
||||
- `DIRECT_URL`: PostgreSQL connection string (required)
|
||||
- `{SERVICE_NAME}_HOST`: Override service host
|
||||
- Standard logging configuration
|
||||
|
||||
### Config Settings (via Config class)
|
||||
|
||||
```python
|
||||
execution_scheduler_port: int = 8003
|
||||
scheduler_db_pool_size: int = 3
|
||||
execution_late_notification_threshold_secs: int
|
||||
block_error_rate_check_interval_secs: int
|
||||
cloud_storage_cleanup_interval_hours: int
|
||||
pyro_host: str = "localhost"
|
||||
pyro_client_comm_timeout: float = 15
|
||||
pyro_client_comm_retry: int = 3
|
||||
rpc_client_call_timeout: int = 300
|
||||
```
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### Unit Tests
|
||||
|
||||
1. Mock APScheduler for job management tests
|
||||
2. Mock database connections
|
||||
3. Test each RPC endpoint independently
|
||||
4. Verify job serialization/deserialization
|
||||
|
||||
### Integration Tests
|
||||
|
||||
1. Test with real PostgreSQL instance
|
||||
2. Verify job persistence across restarts
|
||||
3. Test concurrent job execution
|
||||
4. Validate cron expression parsing
|
||||
|
||||
### Critical Test Cases
|
||||
|
||||
1. **Job Persistence**: Jobs survive scheduler restart
|
||||
2. **User Isolation**: Users can only delete their own jobs
|
||||
3. **Concurrent Access**: Multiple clients can add/remove jobs
|
||||
4. **Error Recovery**: Service recovers from database outages
|
||||
5. **Resource Cleanup**: No memory/connection leaks
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
### Key Design Decisions
|
||||
|
||||
1. **BlockingScheduler vs AsyncIOScheduler**: Uses BlockingScheduler for simplicity and compatibility with multiprocessing architecture
|
||||
|
||||
2. **Job Storage**: All job arguments stored in kwargs, not in job name/id
|
||||
|
||||
3. **Separate Jobstores**: Isolation between execution and notification jobs
|
||||
|
||||
4. **No Authentication**: Relies on network isolation for security
|
||||
|
||||
### Migration Considerations
|
||||
|
||||
1. APScheduler job format must be preserved exactly
|
||||
2. Database schema cannot change without migration
|
||||
3. RPC protocol must maintain compatibility
|
||||
4. Environment variables must match existing deployment
|
||||
|
||||
### Performance Considerations
|
||||
|
||||
1. Database pool size limited to prevent exhaustion
|
||||
2. No job result storage (fire-and-forget pattern)
|
||||
3. Minimal logging in hot paths
|
||||
4. Connection reuse via pooling
|
||||
|
||||
## Appendix: Critical Implementation Details
|
||||
|
||||
### Event Loop Management
|
||||
|
||||
```python
|
||||
@thread_cached
|
||||
def get_event_loop():
|
||||
return asyncio.new_event_loop()
|
||||
|
||||
def execute_graph(**kwargs):
|
||||
get_event_loop().run_until_complete(_execute_graph(**kwargs))
|
||||
```
|
||||
|
||||
### Job Function Execution Context
|
||||
|
||||
- Jobs run in scheduler's process space
|
||||
- Each job gets fresh event loop
|
||||
- No shared state between job executions
|
||||
- Exceptions logged but don't affect scheduler
|
||||
|
||||
### Cron Expression Format
|
||||
|
||||
- Uses standard crontab format via `CronTrigger.from_crontab()`
|
||||
- Supports: minute hour day month day_of_week
|
||||
- Special strings: @yearly, @monthly, @weekly, @daily, @hourly
|
||||
|
||||
This specification provides all necessary details to reimplement the scheduler service while maintaining 100% compatibility with the existing system. Any deviation from these specifications may result in system incompatibility.
|
||||
Reference in New Issue
Block a user