mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-09 22:35:54 -05:00
## Changes 🏗️
Adds Redis-based SSE reconnection support for long-running CoPilot
operations (like Agent Generator), enabling clients to reconnect and
resume receiving updates after disconnection.
### What this does:
- **Stream Registry** - Redis-backed task tracking with message
persistence via Redis Streams
- **SSE Reconnection** - Clients can reconnect to active tasks using
`task_id` and `last_message_id`
- **Duplicate Message Fix** - Filters out in-progress assistant messages
from session response when active stream exists
- **Completion Consumer** - Handles background task completion
notifications via Redis Streams
### Architecture:
```
1. User sends message → Backend creates task in Redis
2. SSE chunks written to Redis Stream for persistence
3. Client receives chunks via SSE subscription
4. If client disconnects → Task continues in background
5. Client reconnects → GET /sessions/{id} returns active_stream info
6. Client subscribes to /tasks/{task_id}/stream with last_message_id
7. Missed messages replayed from Redis Stream
```
### Key endpoints:
- `GET /sessions/{session_id}` - Returns `active_stream` info if task is
running
- `GET /tasks/{task_id}/stream?last_message_id=X` - SSE endpoint for
reconnection
- `GET /tasks/{task_id}` - Get task status
- `POST /operations/{op_id}/complete` - Webhook for external service
completion
### Duplicate message fix:
When `GET /sessions/{id}` detects an active stream:
1. Filters out the in-progress assistant message from response
2. Returns `last_message_id="0-0"` so client replays stream from
beginning
3. Client receives complete response only through SSE (single source of
truth)
### Frontend changes:
- Task persistence in localStorage for cross-tab reconnection
- Stream event dispatcher handles reconnection flow
- Deduplication logic prevents duplicate messages
### Testing:
- Manual testing of reconnection scenarios
- Verified duplicate message fix works correctly
## Related
- Resolves SSE timeout issues for Agent Generator
- Fixes duplicate message bug on reconnection
541 lines
17 KiB
Python
541 lines
17 KiB
Python
import contextlib
|
|
import logging
|
|
import platform
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
import fastapi
|
|
import fastapi.responses
|
|
import pydantic
|
|
import starlette.middleware.cors
|
|
import uvicorn
|
|
from autogpt_libs.auth import add_auth_responses_to_openapi
|
|
from autogpt_libs.auth import verify_settings as verify_auth_settings
|
|
from fastapi.exceptions import RequestValidationError
|
|
from fastapi.middleware.gzip import GZipMiddleware
|
|
from fastapi.routing import APIRoute
|
|
from prisma.errors import PrismaError
|
|
|
|
import backend.api.features.admin.credit_admin_routes
|
|
import backend.api.features.admin.execution_analytics_routes
|
|
import backend.api.features.admin.store_admin_routes
|
|
import backend.api.features.builder
|
|
import backend.api.features.builder.routes
|
|
import backend.api.features.chat.routes as chat_routes
|
|
import backend.api.features.executions.review.routes
|
|
import backend.api.features.library.db
|
|
import backend.api.features.library.model
|
|
import backend.api.features.library.routes
|
|
import backend.api.features.oauth
|
|
import backend.api.features.otto.routes
|
|
import backend.api.features.postmark.postmark
|
|
import backend.api.features.store.model
|
|
import backend.api.features.store.routes
|
|
import backend.api.features.v1
|
|
import backend.api.features.workspace.routes as workspace_routes
|
|
import backend.data.block
|
|
import backend.data.db
|
|
import backend.data.graph
|
|
import backend.data.user
|
|
import backend.integrations.webhooks.utils
|
|
import backend.util.service
|
|
import backend.util.settings
|
|
from backend.api.features.chat.completion_consumer import (
|
|
start_completion_consumer,
|
|
stop_completion_consumer,
|
|
)
|
|
from backend.blocks.llm import DEFAULT_LLM_MODEL
|
|
from backend.data.model import Credentials
|
|
from backend.integrations.providers import ProviderName
|
|
from backend.monitoring.instrumentation import instrument_fastapi
|
|
from backend.util import json
|
|
from backend.util.cloud_storage import shutdown_cloud_storage_handler
|
|
from backend.util.exceptions import (
|
|
MissingConfigError,
|
|
NotAuthorizedError,
|
|
NotFoundError,
|
|
)
|
|
from backend.util.feature_flag import initialize_launchdarkly, shutdown_launchdarkly
|
|
from backend.util.service import UnhealthyServiceError
|
|
from backend.util.workspace_storage import shutdown_workspace_storage
|
|
|
|
from .external.fastapi_app import external_api
|
|
from .features.analytics import router as analytics_router
|
|
from .features.integrations.router import router as integrations_router
|
|
from .middleware.security import SecurityHeadersMiddleware
|
|
from .utils.cors import build_cors_params
|
|
from .utils.openapi import sort_openapi
|
|
|
|
settings = backend.util.settings.Settings()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logging.getLogger("autogpt_libs").setLevel(logging.INFO)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def launch_darkly_context():
|
|
if settings.config.app_env != backend.util.settings.AppEnvironment.LOCAL:
|
|
initialize_launchdarkly()
|
|
try:
|
|
yield
|
|
finally:
|
|
shutdown_launchdarkly()
|
|
else:
|
|
yield
|
|
|
|
|
|
@contextlib.asynccontextmanager
|
|
async def lifespan_context(app: fastapi.FastAPI):
|
|
verify_auth_settings()
|
|
|
|
await backend.data.db.connect()
|
|
|
|
# Configure thread pool for FastAPI sync operation performance
|
|
# CRITICAL: FastAPI automatically runs ALL sync functions in this thread pool:
|
|
# - Any endpoint defined with 'def' (not async def)
|
|
# - Any dependency function defined with 'def' (not async def)
|
|
# - Manual run_in_threadpool() calls (like JWT decoding)
|
|
# Default pool size is only 40 threads, causing bottlenecks under high concurrency
|
|
config = backend.util.settings.Config()
|
|
try:
|
|
import anyio.to_thread
|
|
|
|
anyio.to_thread.current_default_thread_limiter().total_tokens = (
|
|
config.fastapi_thread_pool_size
|
|
)
|
|
logger.info(
|
|
f"Thread pool size set to {config.fastapi_thread_pool_size} for sync endpoint/dependency performance"
|
|
)
|
|
except (ImportError, AttributeError) as e:
|
|
logger.warning(f"Could not configure thread pool size: {e}")
|
|
# Continue without thread pool configuration
|
|
|
|
# Ensure SDK auto-registration is patched before initializing blocks
|
|
from backend.sdk.registry import AutoRegistry
|
|
|
|
AutoRegistry.patch_integrations()
|
|
|
|
await backend.data.block.initialize_blocks()
|
|
|
|
await backend.data.user.migrate_and_encrypt_user_integrations()
|
|
await backend.data.graph.fix_llm_provider_credentials()
|
|
await backend.data.graph.migrate_llm_models(DEFAULT_LLM_MODEL)
|
|
await backend.integrations.webhooks.utils.migrate_legacy_triggered_graphs()
|
|
|
|
# Start chat completion consumer for Redis Streams notifications
|
|
try:
|
|
await start_completion_consumer()
|
|
except Exception as e:
|
|
logger.warning(f"Could not start chat completion consumer: {e}")
|
|
|
|
with launch_darkly_context():
|
|
yield
|
|
|
|
# Stop chat completion consumer
|
|
try:
|
|
await stop_completion_consumer()
|
|
except Exception as e:
|
|
logger.warning(f"Error stopping chat completion consumer: {e}")
|
|
|
|
try:
|
|
await shutdown_cloud_storage_handler()
|
|
except Exception as e:
|
|
logger.warning(f"Error shutting down cloud storage handler: {e}")
|
|
|
|
try:
|
|
await shutdown_workspace_storage()
|
|
except Exception as e:
|
|
logger.warning(f"Error shutting down workspace storage: {e}")
|
|
|
|
await backend.data.db.disconnect()
|
|
|
|
|
|
def custom_generate_unique_id(route: APIRoute):
|
|
"""Generate clean operation IDs for OpenAPI spec following the format:
|
|
{method}{tag}{summary}
|
|
"""
|
|
if not route.tags or not route.methods:
|
|
return f"{route.name}"
|
|
|
|
method = list(route.methods)[0].lower()
|
|
first_tag = route.tags[0]
|
|
if isinstance(first_tag, Enum):
|
|
tag_str = first_tag.name
|
|
else:
|
|
tag_str = str(first_tag)
|
|
|
|
tag = "".join(word.capitalize() for word in tag_str.split("_")) # v1/v2
|
|
|
|
summary = (
|
|
route.summary if route.summary else route.name
|
|
) # need to be unique, a different version could have the same summary
|
|
summary = "".join(word.capitalize() for word in str(summary).split("_"))
|
|
|
|
if tag:
|
|
return f"{method}{tag}{summary}"
|
|
else:
|
|
return f"{method}{summary}"
|
|
|
|
|
|
docs_url = (
|
|
"/docs"
|
|
if settings.config.app_env == backend.util.settings.AppEnvironment.LOCAL
|
|
else None
|
|
)
|
|
|
|
app = fastapi.FastAPI(
|
|
title="AutoGPT Agent Server",
|
|
description=(
|
|
"This server is used to execute agents that are created by the AutoGPT system."
|
|
),
|
|
summary="AutoGPT Agent Server",
|
|
version="0.1",
|
|
lifespan=lifespan_context,
|
|
docs_url=docs_url,
|
|
generate_unique_id_function=custom_generate_unique_id,
|
|
)
|
|
|
|
app.add_middleware(SecurityHeadersMiddleware)
|
|
|
|
# Add GZip compression middleware for large responses (like /api/blocks)
|
|
app.add_middleware(GZipMiddleware, minimum_size=50_000) # 50KB threshold
|
|
|
|
# Add 401 responses to authenticated endpoints in OpenAPI spec
|
|
add_auth_responses_to_openapi(app)
|
|
|
|
# Sort OpenAPI schema to eliminate diff on refactors
|
|
sort_openapi(app)
|
|
|
|
# Add Prometheus instrumentation
|
|
instrument_fastapi(
|
|
app,
|
|
service_name="rest-api",
|
|
expose_endpoint=True,
|
|
endpoint="/metrics",
|
|
include_in_schema=settings.config.app_env
|
|
== backend.util.settings.AppEnvironment.LOCAL,
|
|
)
|
|
|
|
|
|
def handle_internal_http_error(status_code: int = 500, log_error: bool = True):
|
|
def handler(request: fastapi.Request, exc: Exception):
|
|
if log_error:
|
|
logger.exception(
|
|
"%s %s failed. Investigate and resolve the underlying issue: %s",
|
|
request.method,
|
|
request.url.path,
|
|
exc,
|
|
exc_info=exc,
|
|
)
|
|
|
|
hint = (
|
|
"Adjust the request and retry."
|
|
if status_code < 500
|
|
else "Check server logs and dependent services."
|
|
)
|
|
return fastapi.responses.JSONResponse(
|
|
content={
|
|
"message": f"Failed to process {request.method} {request.url.path}",
|
|
"detail": str(exc),
|
|
"hint": hint,
|
|
},
|
|
status_code=status_code,
|
|
)
|
|
|
|
return handler
|
|
|
|
|
|
async def validation_error_handler(
|
|
request: fastapi.Request, exc: Exception
|
|
) -> fastapi.responses.Response:
|
|
logger.error(
|
|
"Validation failed for %s %s: %s. Fix the request payload and try again.",
|
|
request.method,
|
|
request.url.path,
|
|
exc,
|
|
)
|
|
errors: list | str
|
|
if hasattr(exc, "errors"):
|
|
errors = exc.errors() # type: ignore[call-arg]
|
|
else:
|
|
errors = str(exc)
|
|
|
|
response_content = {
|
|
"message": f"Invalid data for {request.method} {request.url.path}",
|
|
"detail": errors,
|
|
"hint": "Ensure the request matches the API schema.",
|
|
}
|
|
|
|
content_json = json.dumps(response_content)
|
|
|
|
return fastapi.responses.Response(
|
|
content=content_json,
|
|
status_code=422,
|
|
media_type="application/json",
|
|
)
|
|
|
|
|
|
app.add_exception_handler(PrismaError, handle_internal_http_error(500))
|
|
app.add_exception_handler(NotFoundError, handle_internal_http_error(404, False))
|
|
app.add_exception_handler(NotAuthorizedError, handle_internal_http_error(403, False))
|
|
app.add_exception_handler(RequestValidationError, validation_error_handler)
|
|
app.add_exception_handler(pydantic.ValidationError, validation_error_handler)
|
|
app.add_exception_handler(MissingConfigError, handle_internal_http_error(503))
|
|
app.add_exception_handler(ValueError, handle_internal_http_error(400))
|
|
app.add_exception_handler(Exception, handle_internal_http_error(500))
|
|
|
|
app.include_router(backend.api.features.v1.v1_router, tags=["v1"], prefix="/api")
|
|
app.include_router(
|
|
integrations_router,
|
|
prefix="/api/integrations",
|
|
tags=["v1", "integrations"],
|
|
)
|
|
app.include_router(
|
|
analytics_router,
|
|
prefix="/api/analytics",
|
|
tags=["analytics"],
|
|
)
|
|
app.include_router(
|
|
backend.api.features.store.routes.router, tags=["v2"], prefix="/api/store"
|
|
)
|
|
app.include_router(
|
|
backend.api.features.builder.routes.router, tags=["v2"], prefix="/api/builder"
|
|
)
|
|
app.include_router(
|
|
backend.api.features.admin.store_admin_routes.router,
|
|
tags=["v2", "admin"],
|
|
prefix="/api/store",
|
|
)
|
|
app.include_router(
|
|
backend.api.features.admin.credit_admin_routes.router,
|
|
tags=["v2", "admin"],
|
|
prefix="/api/credits",
|
|
)
|
|
app.include_router(
|
|
backend.api.features.admin.execution_analytics_routes.router,
|
|
tags=["v2", "admin"],
|
|
prefix="/api/executions",
|
|
)
|
|
app.include_router(
|
|
backend.api.features.executions.review.routes.router,
|
|
tags=["v2", "executions", "review"],
|
|
prefix="/api/review",
|
|
)
|
|
app.include_router(
|
|
backend.api.features.library.routes.router, tags=["v2"], prefix="/api/library"
|
|
)
|
|
app.include_router(
|
|
backend.api.features.otto.routes.router, tags=["v2", "otto"], prefix="/api/otto"
|
|
)
|
|
|
|
app.include_router(
|
|
backend.api.features.postmark.postmark.router,
|
|
tags=["v1", "email"],
|
|
prefix="/api/email",
|
|
)
|
|
app.include_router(
|
|
chat_routes.router,
|
|
tags=["v2", "chat"],
|
|
prefix="/api/chat",
|
|
)
|
|
app.include_router(
|
|
workspace_routes.router,
|
|
tags=["workspace"],
|
|
prefix="/api/workspace",
|
|
)
|
|
app.include_router(
|
|
backend.api.features.oauth.router,
|
|
tags=["oauth"],
|
|
prefix="/api/oauth",
|
|
)
|
|
|
|
app.mount("/external-api", external_api)
|
|
|
|
|
|
@app.get(path="/health", tags=["health"], dependencies=[])
|
|
async def health():
|
|
if not backend.data.db.is_connected():
|
|
raise UnhealthyServiceError("Database is not connected")
|
|
return {"status": "healthy"}
|
|
|
|
|
|
class AgentServer(backend.util.service.AppProcess):
|
|
def run(self):
|
|
cors_params = build_cors_params(
|
|
settings.config.backend_cors_allow_origins,
|
|
settings.config.app_env,
|
|
)
|
|
|
|
server_app = starlette.middleware.cors.CORSMiddleware(
|
|
app=app,
|
|
**cors_params,
|
|
allow_credentials=True,
|
|
allow_methods=["*"], # Allows all methods
|
|
allow_headers=["*"], # Allows all headers
|
|
)
|
|
|
|
# Only add debug in local environment (not supported in all uvicorn versions)
|
|
if settings.config.app_env == backend.util.settings.AppEnvironment.LOCAL:
|
|
import os
|
|
|
|
# Enable asyncio debug mode via environment variable
|
|
os.environ["PYTHONASYNCIODEBUG"] = "1"
|
|
|
|
# Configure uvicorn with performance optimizations from Kludex FastAPI tips
|
|
uvicorn.run(
|
|
app=server_app,
|
|
host=settings.config.agent_api_host,
|
|
port=settings.config.agent_api_port,
|
|
log_config=None,
|
|
# Use httptools for HTTP parsing (if available)
|
|
http="httptools",
|
|
# Only use uvloop on Unix-like systems (not supported on Windows)
|
|
loop="uvloop" if platform.system() != "Windows" else "auto",
|
|
# Disable WebSockets since this service doesn't have any WebSocket endpoints
|
|
ws="none",
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_execute_graph(
|
|
graph_id: str,
|
|
user_id: str,
|
|
graph_version: Optional[int] = None,
|
|
node_input: Optional[dict[str, Any]] = None,
|
|
):
|
|
return await backend.api.features.v1.execute_graph(
|
|
user_id=user_id,
|
|
graph_id=graph_id,
|
|
graph_version=graph_version,
|
|
inputs=node_input or {},
|
|
credentials_inputs={},
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_get_graph(
|
|
graph_id: str,
|
|
graph_version: int,
|
|
user_id: str,
|
|
for_export: bool = False,
|
|
):
|
|
return await backend.api.features.v1.get_graph(
|
|
graph_id, user_id, graph_version, for_export
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_create_graph(
|
|
create_graph: backend.api.features.v1.CreateGraph,
|
|
user_id: str,
|
|
):
|
|
return await backend.api.features.v1.create_new_graph(create_graph, user_id)
|
|
|
|
@staticmethod
|
|
async def test_get_graph_run_status(graph_exec_id: str, user_id: str):
|
|
from backend.data.execution import get_graph_execution_meta
|
|
|
|
execution = await get_graph_execution_meta(
|
|
user_id=user_id, execution_id=graph_exec_id
|
|
)
|
|
if not execution:
|
|
raise ValueError(f"Execution {graph_exec_id} not found")
|
|
return execution.status
|
|
|
|
@staticmethod
|
|
async def test_delete_graph(graph_id: str, user_id: str):
|
|
"""Used for clean-up after a test run"""
|
|
await backend.api.features.library.db.delete_library_agent_by_graph_id(
|
|
graph_id=graph_id, user_id=user_id
|
|
)
|
|
return await backend.api.features.v1.delete_graph(graph_id, user_id)
|
|
|
|
@staticmethod
|
|
async def test_get_presets(user_id: str, page: int = 1, page_size: int = 10):
|
|
return await backend.api.features.library.routes.presets.list_presets(
|
|
user_id=user_id, page=page, page_size=page_size
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_get_preset(preset_id: str, user_id: str):
|
|
return await backend.api.features.library.routes.presets.get_preset(
|
|
preset_id=preset_id, user_id=user_id
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_create_preset(
|
|
preset: backend.api.features.library.model.LibraryAgentPresetCreatable,
|
|
user_id: str,
|
|
):
|
|
return await backend.api.features.library.routes.presets.create_preset(
|
|
preset=preset, user_id=user_id
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_update_preset(
|
|
preset_id: str,
|
|
preset: backend.api.features.library.model.LibraryAgentPresetUpdatable,
|
|
user_id: str,
|
|
):
|
|
return await backend.api.features.library.routes.presets.update_preset(
|
|
preset_id=preset_id, preset=preset, user_id=user_id
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_delete_preset(preset_id: str, user_id: str):
|
|
return await backend.api.features.library.routes.presets.delete_preset(
|
|
preset_id=preset_id, user_id=user_id
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_execute_preset(
|
|
preset_id: str,
|
|
user_id: str,
|
|
inputs: Optional[dict[str, Any]] = None,
|
|
):
|
|
return await backend.api.features.library.routes.presets.execute_preset(
|
|
preset_id=preset_id,
|
|
user_id=user_id,
|
|
inputs=inputs or {},
|
|
credential_inputs={},
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_create_store_listing(
|
|
request: backend.api.features.store.model.StoreSubmissionRequest, user_id: str
|
|
):
|
|
return await backend.api.features.store.routes.create_submission(
|
|
request, user_id
|
|
)
|
|
|
|
### ADMIN ###
|
|
|
|
@staticmethod
|
|
async def test_review_store_listing(
|
|
request: backend.api.features.store.model.ReviewSubmissionRequest,
|
|
user_id: str,
|
|
):
|
|
return await backend.api.features.admin.store_admin_routes.review_submission(
|
|
request.store_listing_version_id, request, user_id
|
|
)
|
|
|
|
@staticmethod
|
|
async def test_create_credentials(
|
|
user_id: str,
|
|
provider: ProviderName,
|
|
credentials: Credentials,
|
|
) -> Credentials:
|
|
from .features.integrations.router import create_credentials, get_credential
|
|
|
|
try:
|
|
return await create_credentials(
|
|
user_id=user_id, provider=provider, credentials=credentials
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error creating credentials: {e}")
|
|
return await get_credential(
|
|
provider=provider,
|
|
user_id=user_id,
|
|
cred_id=credentials.id,
|
|
)
|
|
|
|
def set_test_dependency_overrides(self, overrides: dict):
|
|
app.dependency_overrides.update(overrides)
|