Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into zamilmajdy/fix-static-output-resolve

This commit is contained in:
Zamil Majdy
2025-03-12 07:29:25 +07:00
32 changed files with 1117 additions and 477 deletions

View File

@@ -2,13 +2,23 @@ DB_USER=postgres
DB_PASS=your-super-secret-and-long-postgres-password
DB_NAME=postgres
DB_PORT=5432
DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@localhost:${DB_PORT}/${DB_NAME}?connect_timeout=60&schema=platform"
DB_HOST=localhost
DB_CONNECTION_LIMIT=12
DB_CONNECT_TIMEOUT=60
DB_POOL_TIMEOUT=300
DB_SCHEMA=platform
DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}?schema=${DB_SCHEMA}&connect_timeout=${DB_CONNECT_TIMEOUT}"
PRISMA_SCHEMA="postgres/schema.prisma"
# EXECUTOR
NUM_GRAPH_WORKERS=10
NUM_NODE_WORKERS=3
BACKEND_CORS_ALLOW_ORIGINS=["http://localhost:3000"]
# generate using `from cryptography.fernet import Fernet;Fernet.generate_key().decode()`
ENCRYPTION_KEY='dvziYgz0KSK8FENhju0ZYi8-fRTfAdlz6YLhdB_jhNw='
UNSUBSCRIBE_SECRET_KEY = 'HlP8ivStJjmbf6NKi78m_3FnOogut0t5ckzjsIqeaio='
REDIS_HOST=localhost
REDIS_PORT=6379

View File

@@ -1,75 +1 @@
# AutoGPT Agent Server Advanced set up
This guide walks you through a dockerized set up, with an external DB (postgres)
## Setup
We use the Poetry to manage the dependencies. To set up the project, follow these steps inside this directory:
0. Install Poetry
```sh
pip install poetry
```
1. Configure Poetry to use .venv in your project directory
```sh
poetry config virtualenvs.in-project true
```
2. Enter the poetry shell
```sh
poetry shell
```
3. Install dependencies
```sh
poetry install
```
4. Copy .env.example to .env
```sh
cp .env.example .env
```
5. Generate the Prisma client
```sh
poetry run prisma generate
```
> In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package:
>
> ```sh
> pip uninstall prisma
> ```
>
> Then run the generation again. The path *should* look something like this:
> `<some path>/pypoetry/virtualenvs/backend-TQIRSwR6-py3.12/bin/prisma`
6. Run the postgres database from the /rnd folder
```sh
cd autogpt_platform/
docker compose up -d
```
7. Run the migrations (from the backend folder)
```sh
cd ../backend
prisma migrate deploy
```
## Running The Server
### Starting the server directly
Run the following command:
```sh
poetry run app
```
[Advanced Setup (Dev Branch)](https://dev-docs.agpt.co/platform/advanced_setup/#autogpt_agent_server_advanced_set_up)

View File

@@ -1,210 +1 @@
# AutoGPT Agent Server
This is an initial project for creating the next generation of agent execution, which is an AutoGPT agent server.
The agent server will enable the creation of composite multi-agent systems that utilize AutoGPT agents and other non-agent components as its primitives.
## Docs
You can access the docs for the [AutoGPT Agent Server here](https://docs.agpt.co/server/setup).
## Setup
We use the Poetry to manage the dependencies. To set up the project, follow these steps inside this directory:
0. Install Poetry
```sh
pip install poetry
```
1. Configure Poetry to use .venv in your project directory
```sh
poetry config virtualenvs.in-project true
```
2. Enter the poetry shell
```sh
poetry shell
```
3. Install dependencies
```sh
poetry install
```
4. Copy .env.example to .env
```sh
cp .env.example .env
```
5. Generate the Prisma client
```sh
poetry run prisma generate
```
> In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package:
>
> ```sh
> pip uninstall prisma
> ```
>
> Then run the generation again. The path *should* look something like this:
> `<some path>/pypoetry/virtualenvs/backend-TQIRSwR6-py3.12/bin/prisma`
6. Migrate the database. Be careful because this deletes current data in the database.
```sh
docker compose up db -d
poetry run prisma migrate deploy
```
## Running The Server
### Starting the server without Docker
To run the server locally, start in the autogpt_platform folder:
```sh
cd ..
```
Run the following command to run database in docker but the application locally:
```sh
docker compose --profile local up deps --build --detach
cd backend
poetry run app
```
### Starting the server with Docker
Run the following command to build the dockerfiles:
```sh
docker compose build
```
Run the following command to run the app:
```sh
docker compose up
```
Run the following to automatically rebuild when code changes, in another terminal:
```sh
docker compose watch
```
Run the following command to shut down:
```sh
docker compose down
```
If you run into issues with dangling orphans, try:
```sh
docker compose down --volumes --remove-orphans && docker-compose up --force-recreate --renew-anon-volumes --remove-orphans
```
## Testing
To run the tests:
```sh
poetry run test
```
## Development
### Formatting & Linting
Auto formatter and linter are set up in the project. To run them:
Install:
```sh
poetry install --with dev
```
Format the code:
```sh
poetry run format
```
Lint the code:
```sh
poetry run lint
```
## Project Outline
The current project has the following main modules:
### **blocks**
This module stores all the Agent Blocks, which are reusable components to build a graph that represents the agent's behavior.
### **data**
This module stores the logical model that is persisted in the database.
It abstracts the database operations into functions that can be called by the service layer.
Any code that interacts with Prisma objects or the database should reside in this module.
The main models are:
* `block`: anything related to the block used in the graph
* `execution`: anything related to the execution graph execution
* `graph`: anything related to the graph, node, and its relations
### **execution**
This module stores the business logic of executing the graph.
It currently has the following main modules:
* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both pieces of logic.
* `scheduler`: A service that triggers scheduled graph execution based on a cron expression. It pushes an execution request to the manager.
### **server**
This module stores the logic for the server API.
It contains all the logic used for the API that allows the client to create, execute, and monitor the graph and its execution.
This API service interacts with other services like those defined in `manager` and `scheduler`.
### **utils**
This module stores utility functions that are used across the project.
Currently, it has two main modules:
* `process`: A module that contains the logic to spawn a new process.
* `service`: A module that serves as a parent class for all the services in the project.
## Service Communication
Currently, there are only 3 active services:
- AgentServer (the API, defined in `server.py`)
- ExecutionManager (the executor, defined in `manager.py`)
- ExecutionScheduler (the scheduler, defined in `scheduler.py`)
The services run in independent Python processes and communicate through an IPC.
A communication layer (`service.py`) is created to decouple the communication library from the implementation.
Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process.
By default the daemons run on the following ports:
Execution Manager Daemon: 8002
Execution Scheduler Daemon: 8003
Rest Server Daemon: 8004
## Adding a New Agent Block
To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information:
* All the block code should live in the `blocks` (`backend.blocks`) module.
* `input_schema`: the schema of the input data, represented by a Pydantic object.
* `output_schema`: the schema of the output data, represented by a Pydantic object.
* `run` method: the main logic of the block.
* `test_input` & `test_output`: the sample input and output data for the block, which will be used to auto-test the block.
* You can mock the functions declared in the block using the `test_mock` field for your unit tests.
* Once you finish creating the block, you can test it by running `poetry run pytest -s test/block/test_block.py`.
[Getting Started (Released)](https://docs.agpt.co/platform/getting-started/#autogpt_agent_server)

View File

@@ -32,7 +32,7 @@ def main(**kwargs):
Run all the processes required for the AutoGPT-server (REST and WebSocket APIs).
"""
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.executor import DatabaseManager, ExecutionManager, Scheduler
from backend.notifications import NotificationManager
from backend.server.rest_api import AgentServer
from backend.server.ws_api import WebsocketServer
@@ -40,7 +40,7 @@ def main(**kwargs):
run_processes(
DatabaseManager(),
ExecutionManager(),
ExecutionScheduler(),
Scheduler(),
NotificationManager(),
WebsocketServer(),
AgentServer(),

View File

@@ -1,7 +1,14 @@
import enum
from typing import Any, List
from typing import TYPE_CHECKING, Any, List
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.block import (
Block,
BlockCategory,
BlockInput,
BlockOutput,
BlockSchema,
BlockType,
)
from backend.data.model import SchemaField
from backend.util import json
from backend.util.file import MediaFile, store_media_file
@@ -9,6 +16,9 @@ from backend.util.mock import MockObject
from backend.util.text import TextFormatter
from backend.util.type import convert
if TYPE_CHECKING:
from backend.data.graph import Link
formatter = TextFormatter()
@@ -456,6 +466,17 @@ class AddToListBlock(Block):
description="The position to insert the new entry. If not provided, the entry will be appended to the end of the list.",
)
@classmethod
def get_missing_links(cls, data: BlockInput, links: List["Link"]) -> set[str]:
return super().get_missing_links(
data,
[
link
for link in links
if link.sink_name != "list" or link.sink_id != link.source_id
],
)
class Output(BlockSchema):
updated_list: List[Any] = SchemaField(
description="The list with the new entry added."

View File

@@ -51,6 +51,7 @@ class ExaContentsBlock(Block):
description="List of document contents",
default=[],
)
error: str = SchemaField(description="Error message if the request failed")
def __init__(self):
super().__init__(

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.settings import Settings
from ._auth import (
GOOGLE_OAUTH_IS_CONFIGURED,
@@ -150,8 +151,8 @@ class GmailReadBlock(Block):
else None
),
token_uri="https://oauth2.googleapis.com/token",
client_id=kwargs.get("client_id"),
client_secret=kwargs.get("client_secret"),
client_id=Settings().secrets.google_client_id,
client_secret=Settings().secrets.google_client_secret,
scopes=credentials.scopes,
)
return build("gmail", "v1", credentials=creds)

View File

@@ -3,6 +3,7 @@ from googleapiclient.discovery import build
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.settings import Settings
from ._auth import (
GOOGLE_OAUTH_IS_CONFIGURED,
@@ -86,8 +87,8 @@ class GoogleSheetsReadBlock(Block):
else None
),
token_uri="https://oauth2.googleapis.com/token",
client_id=kwargs.get("client_id"),
client_secret=kwargs.get("client_secret"),
client_id=Settings().secrets.google_client_id,
client_secret=Settings().secrets.google_client_secret,
scopes=credentials.scopes,
)
return build("sheets", "v4", credentials=creds)

View File

@@ -2,6 +2,7 @@ import logging
import os
import zlib
from contextlib import asynccontextmanager
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from uuid import uuid4
from dotenv import load_dotenv
@@ -15,7 +16,36 @@ load_dotenv()
PRISMA_SCHEMA = os.getenv("PRISMA_SCHEMA", "schema.prisma")
os.environ["PRISMA_SCHEMA_PATH"] = PRISMA_SCHEMA
prisma = Prisma(auto_register=True)
def add_param(url: str, key: str, value: str) -> str:
p = urlparse(url)
qs = dict(parse_qsl(p.query))
qs[key] = value
return urlunparse(p._replace(query=urlencode(qs)))
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432")
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
if CONN_LIMIT:
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
CONN_TIMEOUT = os.getenv("DB_CONNECT_TIMEOUT")
if CONN_TIMEOUT:
DATABASE_URL = add_param(DATABASE_URL, "connect_timeout", CONN_TIMEOUT)
POOL_TIMEOUT = os.getenv("DB_POOL_TIMEOUT")
if POOL_TIMEOUT:
DATABASE_URL = add_param(DATABASE_URL, "pool_timeout", POOL_TIMEOUT)
HTTP_TIMEOUT = int(POOL_TIMEOUT) if POOL_TIMEOUT else None
prisma = Prisma(
auto_register=True,
http={"timeout": HTTP_TIMEOUT},
datasource={"url": DATABASE_URL},
)
logger = logging.getLogger(__name__)

View File

@@ -23,8 +23,8 @@ T_co = TypeVar("T_co", bound="BaseNotificationData", covariant=True)
class QueueType(Enum):
IMMEDIATE = "immediate" # Send right away (errors, critical notifications)
HOURLY = "hourly" # Batch for up to an hour (usage reports)
DAILY = "daily" # Daily digest (summary notifications)
BATCH = "batch" # Batch for up to an hour (usage reports)
SUMMARY = "summary" # Daily digest (summary notifications)
BACKOFF = "backoff" # Backoff strategy (exponential backoff)
ADMIN = "admin" # Admin notifications (errors, critical notifications)
@@ -196,15 +196,15 @@ class NotificationTypeOverride:
def strategy(self) -> QueueType:
BATCHING_RULES = {
# These are batched by the notification service
NotificationType.AGENT_RUN: QueueType.IMMEDIATE,
NotificationType.AGENT_RUN: QueueType.BATCH,
# These are batched by the notification service, but with a backoff strategy
NotificationType.ZERO_BALANCE: QueueType.BACKOFF,
NotificationType.LOW_BALANCE: QueueType.IMMEDIATE,
NotificationType.BLOCK_EXECUTION_FAILED: QueueType.BACKOFF,
NotificationType.CONTINUOUS_AGENT_ERROR: QueueType.BACKOFF,
NotificationType.DAILY_SUMMARY: QueueType.DAILY,
NotificationType.WEEKLY_SUMMARY: QueueType.DAILY,
NotificationType.MONTHLY_SUMMARY: QueueType.DAILY,
NotificationType.DAILY_SUMMARY: QueueType.SUMMARY,
NotificationType.WEEKLY_SUMMARY: QueueType.SUMMARY,
NotificationType.MONTHLY_SUMMARY: QueueType.SUMMARY,
NotificationType.REFUND_REQUEST: QueueType.ADMIN,
NotificationType.REFUND_PROCESSED: QueueType.ADMIN,
}
@@ -263,7 +263,7 @@ class NotificationPreference(BaseModel):
def get_batch_delay(notification_type: NotificationType) -> timedelta:
return {
NotificationType.AGENT_RUN: timedelta(seconds=1),
NotificationType.AGENT_RUN: timedelta(minutes=1),
NotificationType.ZERO_BALANCE: timedelta(minutes=60),
NotificationType.LOW_BALANCE: timedelta(minutes=60),
NotificationType.BLOCK_EXECUTION_FAILED: timedelta(minutes=60),
@@ -274,19 +274,15 @@ def get_batch_delay(notification_type: NotificationType) -> timedelta:
async def create_or_add_to_user_notification_batch(
user_id: str,
notification_type: NotificationType,
data: str, # type: 'NotificationEventModel'
) -> dict:
notification_data: NotificationEventModel,
) -> UserNotificationBatch:
try:
logger.info(
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {data}"
f"Creating or adding to notification batch for {user_id} with type {notification_type} and data {notification_data}"
)
notification_data = NotificationEventModel[
get_data_type(notification_type)
].model_validate_json(data)
# Serialize the data
json_data: Json = Json(notification_data.data.model_dump_json())
json_data: Json = Json(notification_data.data.model_dump())
# First try to find existing batch
existing_batch = await UserNotificationBatch.prisma().find_unique(
@@ -317,7 +313,7 @@ async def create_or_add_to_user_notification_batch(
},
include={"notifications": True},
)
return resp.model_dump()
return resp
else:
async with transaction() as tx:
notification_event = await tx.notificationevent.create(
@@ -339,27 +335,28 @@ async def create_or_add_to_user_notification_batch(
raise DatabaseError(
f"Failed to add notification event {notification_event.id} to existing batch {existing_batch.id}"
)
return resp.model_dump()
return resp
except Exception as e:
raise DatabaseError(
f"Failed to create or add to notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_user_notification_last_message_in_batch(
async def get_user_notification_oldest_message_in_batch(
user_id: str,
notification_type: NotificationType,
) -> NotificationEvent | None:
try:
batch = await UserNotificationBatch.prisma().find_first(
where={"userId": user_id, "type": notification_type},
order={"createdAt": "desc"},
include={"notifications": True},
)
if not batch:
return None
if not batch.notifications:
return None
return batch.notifications[-1]
sorted_notifications = sorted(batch.notifications, key=lambda x: x.createdAt)
return sorted_notifications[0]
except Exception as e:
raise DatabaseError(
f"Failed to get user notification last message in batch for user {user_id} and type {notification_type}: {e}"
@@ -404,3 +401,22 @@ async def get_user_notification_batch(
raise DatabaseError(
f"Failed to get user notification batch for user {user_id} and type {notification_type}: {e}"
) from e
async def get_all_batches_by_type(
notification_type: NotificationType,
) -> list[UserNotificationBatch]:
try:
return await UserNotificationBatch.prisma().find_many(
where={
"type": notification_type,
"notifications": {
"some": {} # Only return batches with at least one notification
},
},
include={"notifications": True},
)
except Exception as e:
raise DatabaseError(
f"Failed to get all batches by type {notification_type}: {e}"
) from e

View File

@@ -35,7 +35,7 @@ class BaseRedisEventBus(Generic[M], ABC):
def _serialize_message(self, item: M, channel_key: str) -> tuple[str, str]:
message = json.dumps(item.model_dump(), cls=DateTimeEncoder)
channel_name = f"{self.event_bus_name}/{channel_key}"
logger.info(f"[{channel_name}] Publishing an event to Redis {message}")
logger.debug(f"[{channel_name}] Publishing an event to Redis {message}")
return message, channel_name
def _deserialize_message(self, msg: Any, channel_key: str) -> M | None:
@@ -44,7 +44,7 @@ class BaseRedisEventBus(Generic[M], ABC):
return None
try:
data = json.loads(msg["data"])
logger.info(f"Consuming an event from Redis {data}")
logger.debug(f"Consuming an event from Redis {data}")
return self.Model(**data)
except Exception as e:
logger.error(f"Failed to parse event result from Redis {msg} {e}")

View File

@@ -1,6 +1,10 @@
import base64
import hashlib
import hmac
import logging
from datetime import datetime, timedelta
from typing import Optional, cast
from urllib.parse import quote_plus
from autogpt_libs.auth.models import DEFAULT_USER_ID
from fastapi import HTTPException
@@ -14,6 +18,7 @@ from backend.data.model import UserIntegrations, UserMetadata, UserMetadataRaw
from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO
from backend.server.v2.store.exceptions import DatabaseError
from backend.util.encryption import JSONCryptor
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
@@ -334,3 +339,59 @@ async def get_user_email_verification(user_id: str) -> bool:
raise DatabaseError(
f"Failed to get email verification status for user {user_id}: {e}"
) from e
def generate_unsubscribe_link(user_id: str) -> str:
"""Generate a link to unsubscribe from all notifications"""
# Create an HMAC using a secret key
secret_key = Settings().secrets.unsubscribe_secret_key
signature = hmac.new(
secret_key.encode("utf-8"), user_id.encode("utf-8"), hashlib.sha256
).digest()
# Create a token that combines the user_id and signature
token = base64.urlsafe_b64encode(
f"{user_id}:{signature.hex()}".encode("utf-8")
).decode("utf-8")
logger.info(f"Generating unsubscribe link for user {user_id}")
base_url = Settings().config.platform_base_url
return f"{base_url}/api/email/unsubscribe?token={quote_plus(token)}"
async def unsubscribe_user_by_token(token: str) -> None:
"""Unsubscribe a user from all notifications using the token"""
try:
# Decode the token
decoded = base64.urlsafe_b64decode(token).decode("utf-8")
user_id, received_signature_hex = decoded.split(":", 1)
# Verify the signature
secret_key = Settings().secrets.unsubscribe_secret_key
expected_signature = hmac.new(
secret_key.encode("utf-8"), user_id.encode("utf-8"), hashlib.sha256
).digest()
if not hmac.compare_digest(expected_signature.hex(), received_signature_hex):
raise ValueError("Invalid token signature")
user = await get_user_by_id(user_id)
await update_user_notification_preference(
user.id,
NotificationPreferenceDTO(
email=user.email,
daily_limit=0,
preferences={
NotificationType.AGENT_RUN: False,
NotificationType.ZERO_BALANCE: False,
NotificationType.LOW_BALANCE: False,
NotificationType.BLOCK_EXECUTION_FAILED: False,
NotificationType.CONTINUOUS_AGENT_ERROR: False,
NotificationType.DAILY_SUMMARY: False,
NotificationType.WEEKLY_SUMMARY: False,
NotificationType.MONTHLY_SUMMARY: False,
},
),
)
except Exception as e:
raise DatabaseError(f"Failed to unsubscribe user by token {token}: {e}") from e

View File

@@ -1,9 +1,9 @@
from .database import DatabaseManager
from .manager import ExecutionManager
from .scheduler import ExecutionScheduler
from .scheduler import Scheduler
__all__ = [
"DatabaseManager",
"ExecutionManager",
"ExecutionScheduler",
"Scheduler",
]

View File

@@ -109,7 +109,10 @@ class LogMetadata:
logger.exception(msg, extra={"json_fields": {**self.metadata, **extra}})
def _wrap(self, msg: str, **extra):
return f"{self.prefix} {msg} {extra or ''}"
extra_msg = str(extra or "")
if len(extra_msg) > 1000:
extra_msg = extra_msg[:1000] + "..."
return f"{self.prefix} {msg} {extra_msg}"
T = TypeVar("T")

View File

@@ -1,5 +1,6 @@
import logging
import os
from enum import Enum
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
@@ -9,11 +10,13 @@ from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from autogpt_libs.utils.cache import thread_cached
from dotenv import load_dotenv
from prisma.enums import NotificationType
from pydantic import BaseModel
from sqlalchemy import MetaData, create_engine
from backend.data.block import BlockInput
from backend.executor.manager import ExecutionManager
from backend.notifications.notifications import NotificationManager
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Config
@@ -42,7 +45,7 @@ config = Config()
def log(msg, **kwargs):
logger.info("[ExecutionScheduler] " + msg, **kwargs)
logger.info("[Scheduler] " + msg, **kwargs)
def job_listener(event):
@@ -58,8 +61,15 @@ def get_execution_client() -> ExecutionManager:
return get_service_client(ExecutionManager)
@thread_cached
def get_notification_client():
from backend.notifications import NotificationManager
return get_service_client(NotificationManager)
def execute_graph(**kwargs):
args = JobArgs(**kwargs)
args = ExecutionJobArgs(**kwargs)
try:
log(f"Executing recurring job for graph #{args.graph_id}")
get_execution_client().add_execution(
@@ -72,7 +82,23 @@ def execute_graph(**kwargs):
logger.exception(f"Error executing graph {args.graph_id}: {e}")
class JobArgs(BaseModel):
def process_existing_batches(**kwargs):
args = NotificationJobArgs(**kwargs)
try:
log(
f"Processing existing batches for notification type {args.notification_types}"
)
get_notification_client().process_existing_batches(args.notification_types)
except Exception as e:
logger.exception(f"Error processing existing batches: {e}")
class Jobstores(Enum):
EXECUTION = "execution"
BATCHED_NOTIFICATIONS = "batched_notifications"
class ExecutionJobArgs(BaseModel):
graph_id: str
input_data: BlockInput
user_id: str
@@ -80,14 +106,14 @@ class JobArgs(BaseModel):
cron: str
class JobInfo(JobArgs):
class ExecutionJobInfo(ExecutionJobArgs):
id: str
name: str
next_run_time: str
@staticmethod
def from_db(job_args: JobArgs, job_obj: JobObj) -> "JobInfo":
return JobInfo(
def from_db(job_args: ExecutionJobArgs, job_obj: JobObj) -> "ExecutionJobInfo":
return ExecutionJobInfo(
id=job_obj.id,
name=job_obj.name,
next_run_time=job_obj.next_run_time.isoformat(),
@@ -95,7 +121,29 @@ class JobInfo(JobArgs):
)
class ExecutionScheduler(AppService):
class NotificationJobArgs(BaseModel):
notification_types: list[NotificationType]
cron: str
class NotificationJobInfo(NotificationJobArgs):
id: str
name: str
next_run_time: str
@staticmethod
def from_db(
job_args: NotificationJobArgs, job_obj: JobObj
) -> "NotificationJobInfo":
return NotificationJobInfo(
id=job_obj.id,
name=job_obj.name,
next_run_time=job_obj.next_run_time.isoformat(),
**job_args.model_dump(),
)
class Scheduler(AppService):
scheduler: BlockingScheduler
@classmethod
@@ -111,19 +159,36 @@ class ExecutionScheduler(AppService):
def execution_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
@property
@thread_cached
def notification_client(self) -> NotificationManager:
return get_service_client(NotificationManager)
def run_service(self):
load_dotenv()
db_schema, db_url = _extract_schema_from_url(os.getenv("DATABASE_URL"))
self.scheduler = BlockingScheduler(
jobstores={
"default": SQLAlchemyJobStore(
Jobstores.EXECUTION.value: SQLAlchemyJobStore(
engine=create_engine(
url=db_url,
pool_size=self.db_pool_size(),
max_overflow=0,
),
metadata=MetaData(schema=db_schema),
)
# this one is pre-existing so it keeps the
# default table name.
tablename="apscheduler_jobs",
),
Jobstores.BATCHED_NOTIFICATIONS.value: SQLAlchemyJobStore(
engine=create_engine(
url=db_url,
pool_size=self.db_pool_size(),
max_overflow=0,
),
metadata=MetaData(schema=db_schema),
tablename="apscheduler_jobs_batched_notifications",
),
}
)
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
@@ -137,8 +202,8 @@ class ExecutionScheduler(AppService):
cron: str,
input_data: BlockInput,
user_id: str,
) -> JobInfo:
job_args = JobArgs(
) -> ExecutionJobInfo:
job_args = ExecutionJobArgs(
graph_id=graph_id,
input_data=input_data,
user_id=user_id,
@@ -150,37 +215,59 @@ class ExecutionScheduler(AppService):
CronTrigger.from_crontab(cron),
kwargs=job_args.model_dump(),
replace_existing=True,
jobstore=Jobstores.EXECUTION.value,
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {input_data}")
return JobInfo.from_db(job_args, job)
return ExecutionJobInfo.from_db(job_args, job)
@expose
def delete_schedule(self, schedule_id: str, user_id: str) -> JobInfo:
job = self.scheduler.get_job(schedule_id)
def delete_schedule(self, schedule_id: str, user_id: str) -> ExecutionJobInfo:
job = self.scheduler.get_job(schedule_id, jobstore=Jobstores.EXECUTION.value)
if not job:
log(f"Job {schedule_id} not found.")
raise ValueError(f"Job #{schedule_id} not found.")
job_args = JobArgs(**job.kwargs)
job_args = ExecutionJobArgs(**job.kwargs)
if job_args.user_id != user_id:
raise ValueError("User ID does not match the job's user ID.")
log(f"Deleting job {schedule_id}")
job.remove()
return JobInfo.from_db(job_args, job)
return ExecutionJobInfo.from_db(job_args, job)
@expose
def get_execution_schedules(
self, graph_id: str | None = None, user_id: str | None = None
) -> list[JobInfo]:
) -> list[ExecutionJobInfo]:
schedules = []
for job in self.scheduler.get_jobs():
job_args = JobArgs(**job.kwargs)
for job in self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value):
job_args = ExecutionJobArgs(**job.kwargs)
if (
job.next_run_time is not None
and (graph_id is None or job_args.graph_id == graph_id)
and (user_id is None or job_args.user_id == user_id)
):
schedules.append(JobInfo.from_db(job_args, job))
schedules.append(ExecutionJobInfo.from_db(job_args, job))
return schedules
@expose
def add_batched_notification_schedule(
self,
notification_types: list[NotificationType],
data: dict,
cron: str,
) -> NotificationJobInfo:
job_args = NotificationJobArgs(
notification_types=notification_types,
cron=cron,
)
job = self.scheduler.add_job(
process_existing_batches,
CronTrigger.from_crontab(cron),
kwargs=job_args.model_dump(),
replace_existing=True,
jobstore=Jobstores.BATCHED_NOTIFICATIONS.value,
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {data}")
return NotificationJobInfo.from_db(job_args, job)

View File

@@ -49,6 +49,7 @@ class EmailSender:
notification: NotificationType,
user_email: str,
data: NotificationEventModel[T_co] | list[NotificationEventModel[T_co]],
user_unsub_link: str | None = None,
):
"""Send an email to a user using a template pulled from the notification type"""
if not self.postmark:
@@ -56,20 +57,34 @@ class EmailSender:
return
template = self._get_template(notification)
base_url = (
settings.config.frontend_base_url or settings.config.platform_base_url
)
# Handle the case when data is a list
template_data = data
if isinstance(data, list):
# Create a dictionary with a 'notifications' key containing the list
template_data = {"notifications": data}
try:
subject, full_message = self.formatter.format_email(
base_template=template.base_template,
subject_template=template.subject_template,
content_template=template.body_template,
data=data,
unsubscribe_link="https://platform.agpt.co/profile/settings",
data=template_data,
unsubscribe_link=f"{base_url}/profile/settings",
)
except Exception as e:
logger.error(f"Error formatting full message: {e}")
raise e
self._send_email(user_email, subject, full_message)
self._send_email(
user_email=user_email,
user_unsubscribe_link=user_unsub_link,
subject=subject,
body=full_message,
)
def _get_template(self, notification: NotificationType):
# convert the notification type to a notification type override
@@ -90,7 +105,13 @@ class EmailSender:
base_template=base_template,
)
def _send_email(self, user_email: str, subject: str, body: str):
def _send_email(
self,
user_email: str,
subject: str,
body: str,
user_unsubscribe_link: str | None = None,
):
if not self.postmark:
logger.warning("Email tried to send without postmark configured")
return
@@ -100,4 +121,13 @@ class EmailSender:
To=user_email,
Subject=subject,
HtmlBody=body,
# Headers default to None internally so this is fine
Headers=(
{
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
"List-Unsubscribe": f"<{user_unsubscribe_link}>",
}
if user_unsubscribe_link
else None
),
)

View File

@@ -1,9 +1,11 @@
import logging
import time
from datetime import datetime, timezone
from typing import Callable
import aio_pika
from aio_pika.exceptions import QueueEmpty
from autogpt_libs.utils.cache import thread_cached
from prisma.enums import NotificationType
from pydantic import BaseModel
@@ -12,16 +14,23 @@ from backend.data.notifications import (
NotificationEventModel,
NotificationResult,
QueueType,
create_or_add_to_user_notification_batch,
empty_user_notification_batch,
get_all_batches_by_type,
get_batch_delay,
get_data_type,
get_user_notification_batch,
get_user_notification_oldest_message_in_batch,
)
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.user import (
generate_unsubscribe_link,
get_user_email_by_id,
get_user_email_verification,
get_user_notification_preference,
)
from backend.notifications.email import EmailSender
from backend.util.service import AppService, expose
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
@@ -59,6 +68,16 @@ def create_notification_config() -> RabbitMQConfig:
"x-dead-letter-routing-key": "failed.admin",
},
),
# Batch Queue
Queue(
name="batch_notifications",
exchange=notification_exchange,
routing_key="notification.batch.#",
arguments={
"x-dead-letter-exchange": dead_letter_exchange.name,
"x-dead-letter-routing-key": "failed.batch",
},
),
# Failed notifications queue
Queue(
name="failed_notifications",
@@ -76,6 +95,13 @@ def create_notification_config() -> RabbitMQConfig:
)
@thread_cached
def get_scheduler():
from backend.executor import Scheduler
return get_service_client(Scheduler)
class NotificationManager(AppService):
"""Service for handling notifications with batching support"""
@@ -98,12 +124,134 @@ class NotificationManager(AppService):
return f"notification.backoff.{event.type.value}"
elif event.strategy == QueueType.ADMIN:
return f"notification.admin.{event.type.value}"
elif event.strategy == QueueType.HOURLY:
return f"notification.hourly.{event.type.value}"
elif event.strategy == QueueType.DAILY:
return f"notification.daily.{event.type.value}"
elif event.strategy == QueueType.BATCH:
return f"notification.batch.{event.type.value}"
elif event.strategy == QueueType.SUMMARY:
return f"notification.summary.{event.type.value}"
return f"notification.{event.type.value}"
@expose
def process_existing_batches(self, notification_types: list[NotificationType]):
"""Process existing batches for specified notification types"""
try:
processed_count = 0
current_time = datetime.now(tz=timezone.utc)
for notification_type in notification_types:
# Get all batches for this notification type
batches = self.run_and_wait(get_all_batches_by_type(notification_type))
for batch in batches:
# Check if batch has aged out
oldest_message = self.run_and_wait(
get_user_notification_oldest_message_in_batch(
batch.userId, notification_type
)
)
if not oldest_message:
# this should never happen
logger.error(
f"Batch for user {batch.userId} and type {notification_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
)
continue
max_delay = get_batch_delay(notification_type)
# If batch has aged out, process it
if oldest_message.createdAt + max_delay < current_time:
recipient_email = self.run_and_wait(
get_user_email_by_id(batch.userId)
)
if not recipient_email:
logger.error(
f"User email not found for user {batch.userId}"
)
continue
should_send = self._should_email_user_based_on_preference(
batch.userId, notification_type
)
if not should_send:
logger.debug(
f"User {batch.userId} does not want to receive {notification_type} notifications"
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
)
continue
batch_data = self.run_and_wait(
get_user_notification_batch(batch.userId, notification_type)
)
if not batch_data or not batch_data.notifications:
logger.error(
f"Batch data not found for user {batch.userId}"
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
)
continue
unsub_link = generate_unsubscribe_link(batch.userId)
events = [
NotificationEventModel[
get_data_type(db_event.type)
].model_validate(
{
"user_id": batch.userId,
"type": db_event.type,
"data": db_event.data,
"created_at": db_event.createdAt,
}
)
for db_event in batch_data.notifications
]
logger.info(f"{events=}")
self.email_sender.send_templated(
notification=notification_type,
user_email=recipient_email,
data=events,
user_unsub_link=unsub_link,
)
# Clear the batch
self.run_and_wait(
empty_user_notification_batch(
batch.userId, notification_type
)
)
processed_count += 1
logger.info(f"Processed {processed_count} aged batches")
return {
"success": True,
"processed_count": processed_count,
"notification_types": [nt.value for nt in notification_types],
"timestamp": current_time.isoformat(),
}
except Exception as e:
logger.exception(f"Error processing batches: {e}")
return {
"success": False,
"error": str(e),
"notification_types": [nt.value for nt in notification_types],
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
}
@expose
def queue_notification(self, event: NotificationEventDTO) -> NotificationResult:
"""Queue a notification - exposed method for other services to call"""
@@ -151,6 +299,32 @@ class NotificationManager(AppService):
# only if both are true, should we email this person
return validated_email and preference
async def _should_batch(
self, user_id: str, event_type: NotificationType, event: NotificationEventModel
) -> bool:
await create_or_add_to_user_notification_batch(user_id, event_type, event)
oldest_message = await get_user_notification_oldest_message_in_batch(
user_id, event_type
)
if not oldest_message:
logger.error(
f"Batch for user {user_id} and type {event_type} has no oldest message whichshould never happen!!!!!!!!!!!!!!!!"
)
return False
oldest_age = oldest_message.createdAt
max_delay = get_batch_delay(event_type)
if oldest_age + max_delay < datetime.now(tz=timezone.utc):
logger.info(f"Batch for user {user_id} and type {event_type} is old enough")
return True
logger.info(
f"Batch for user {user_id} and type {event_type} is not old enough: {oldest_age + max_delay} < {datetime.now(tz=timezone.utc)} max_delay={max_delay}"
)
return False
def _parse_message(self, message: str) -> NotificationEvent | None:
try:
event = NotificationEventDTO.model_validate_json(message)
@@ -175,7 +349,7 @@ class NotificationManager(AppService):
self.email_sender.send_templated(event.type, recipient_email, model)
return True
except Exception as e:
logger.exception(f"Error processing notification: {e}")
logger.exception(f"Error processing notification for admin queue: {e}")
return False
def _process_immediate(self, message: str) -> bool:
@@ -202,10 +376,81 @@ class NotificationManager(AppService):
)
return True
self.email_sender.send_templated(event.type, recipient_email, model)
unsub_link = generate_unsubscribe_link(event.user_id)
self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=model,
user_unsub_link=unsub_link,
)
return True
except Exception as e:
logger.exception(f"Error processing notification: {e}")
logger.exception(f"Error processing notification for immediate queue: {e}")
return False
def _process_batch(self, message: str) -> bool:
"""Process a single notification with a batching strategy, returning whether to put into the failed queue"""
try:
parsed = self._parse_message(message)
if not parsed:
return False
event = parsed.event
model = parsed.model
logger.info(f"Processing batch notification: {model}")
recipient_email = self.run_and_wait(get_user_email_by_id(event.user_id))
if not recipient_email:
logger.error(f"User email not found for user {event.user_id}")
return False
should_send = self._should_email_user_based_on_preference(
event.user_id, event.type
)
if not should_send:
logger.info(
f"User {event.user_id} does not want to receive {event.type} notifications"
)
return True
should_send = self.run_and_wait(
self._should_batch(event.user_id, event.type, model)
)
if not should_send:
logger.info("Batch not old enough to send")
return False
batch = self.run_and_wait(
get_user_notification_batch(event.user_id, event.type)
)
if not batch or not batch.notifications:
logger.error(f"Batch not found for user {event.user_id}")
return False
unsub_link = generate_unsubscribe_link(event.user_id)
batch_messages = [
NotificationEventModel[get_data_type(db_event.type)].model_validate(
{
"user_id": event.user_id,
"type": db_event.type,
"data": db_event.data,
"created_at": db_event.createdAt,
}
)
for db_event in batch.notifications
]
self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=batch_messages,
user_unsub_link=unsub_link,
)
# only empty the batch if we sent the email successfully
self.run_and_wait(empty_user_notification_batch(event.user_id, event.type))
return True
except Exception as e:
logger.exception(f"Error processing notification for batch queue: {e}")
return False
def _run_queue(
@@ -240,12 +485,25 @@ class NotificationManager(AppService):
def run_service(self):
logger.info(f"[{self.service_name}] Started notification service")
# Set up scheduler for batch processing of all notification types
# this can be changed later to spawn differnt cleanups on different schedules
try:
get_scheduler().add_batched_notification_schedule(
notification_types=list(NotificationType),
data={},
cron="0 * * * *",
)
logger.info("Scheduled notification cleanup")
except Exception as e:
logger.error(f"Error scheduling notification cleanup: {e}")
# Set up queue consumers
channel = self.run_and_wait(self.rabbit.get_channel())
immediate_queue = self.run_and_wait(
channel.get_queue("immediate_notifications")
)
batch_queue = self.run_and_wait(channel.get_queue("batch_notifications"))
admin_queue = self.run_and_wait(channel.get_queue("admin_notifications"))
@@ -261,6 +519,11 @@ class NotificationManager(AppService):
process_func=self._process_admin_message,
error_queue_name="admin_notifications",
)
self._run_queue(
queue=batch_queue,
process_func=self._process_batch,
error_queue_name="batch_notifications",
)
time.sleep(0.1)

View File

@@ -1,5 +1,6 @@
{# Agent Run #}
{# Template variables:
notification.data: the stuff below but a list of them
data.agent_name: the name of the agent
data.credits_used: the number of credits used by the agent
data.node_count: the number of nodes the agent ran on
@@ -7,90 +8,135 @@ data.execution_time: the time it took to run the agent
data.graph_id: the id of the graph the agent ran on
data.outputs: the list of outputs of the agent
#}
<p style="
font-family: 'Poppins', sans-serif;
color: #070629;
font-size: 16px;
line-height: 165%;
margin-top: 0;
margin-bottom: 10px;
">
Your agent, <strong>{{ data.agent_name }}</strong>, has completed its run!
</p>
<p style="
font-family: 'Poppins', sans-serif;
color: #070629;
font-size: 16px;
line-height: 165%;
margin-top: 0;
margin-bottom: 20px;
padding-left: 20px;
">
<p style="margin-bottom: 10px;"><strong>Time Taken:</strong> {{ data.execution_time | int }} seconds</p>
<p style="margin-bottom: 10px;"><strong>Nodes Used:</strong> {{ data.node_count }}</p>
<p style="margin-bottom: 10px;"><strong>Cost:</strong> ${{ "{:.2f}".format((data.credits_used|float)/100) }}</p>
</p>
{% if data.outputs and data.outputs|length > 0 %}
<div style="
margin-left: 15px;
margin-bottom: 20px;
">
<p style="
font-family: 'Poppins', sans-serif;
color: #070629;
font-weight: 600;
font-size: 16px;
margin-bottom: 10px;
">
Results:
</p>
{% for output in data.outputs %}
<div style="
margin-left: 15px;
margin-bottom: 15px;
">
<p style="
font-family: 'Poppins', sans-serif;
color: #5D23BB;
font-weight: 500;
font-size: 16px;
margin-top: 0;
margin-bottom: 8px;
">
{{ output.name }}
{% if notifications is defined %}
{# BATCH MODE #}
<div style="font-family: 'Poppins', sans-serif; color: #070629;">
<h2 style="color: #5D23BB; margin-bottom: 15px;">Agent Run Summary</h2>
<p style="font-size: 16px; line-height: 165%; margin-top: 0; margin-bottom: 15px;">
<strong>{{ notifications|length }}</strong> agent runs have completed!
</p>
{% for key, value in output.items() %}
{% if key != 'name' %}
<div style="
margin-left: 15px;
background-color: #f5f5ff;
padding: 8px 12px;
border-radius: 4px;
font-family: 'Roboto Mono', monospace;
white-space: pre-wrap;
word-break: break-word;
overflow-wrap: break-word;
max-width: 100%;
overflow-x: auto;
margin-top: 5px;
margin-bottom: 10px;
line-height: 1.4;
">
{% if value is iterable and value is not string %}
{% if value|length == 1 %}
{{ value[0] }}
{% else %}
[{% for item in value %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}]
{% endif %}
{% else %}
{{ value }}
{% endif %}
</div>
{# Calculate summary stats #}
{% set total_time = 0 %}
{% set total_nodes = 0 %}
{% set total_credits = 0 %}
{% set agent_names = [] %}
{% for notification in notifications %}
{% set total_time = total_time + notification.data.execution_time %}
{% set total_nodes = total_nodes + notification.data.node_count %}
{% set total_credits = total_credits + notification.data.credits_used %}
{% if notification.data.agent_name not in agent_names %}
{% set agent_names = agent_names + [notification.data.agent_name] %}
{% endif %}
{% endfor %}
<div style="background-color: #f8f7ff; border-radius: 8px; padding: 15px; margin-bottom: 25px;">
<h3 style="margin-top: 0; margin-bottom: 10px; color: #5D23BB;">Summary</h3>
<p style="margin: 5px 0;"><strong>Agents:</strong> {{ agent_names|join(", ") }}</p>
<p style="margin: 5px 0;"><strong>Total Time:</strong> {{ total_time | int }} seconds</p>
<p style="margin: 5px 0;"><strong>Total Nodes:</strong> {{ total_nodes }}</p>
<p style="margin: 5px 0;"><strong>Total Cost:</strong> ${{ "{:.2f}".format((total_credits|float)/100) }}</p>
</div>
<h3 style="margin-top: 25px; margin-bottom: 15px; color: #5D23BB;">Individual Runs</h3>
{% for notification in notifications %}
<div style="margin-bottom: 30px; border-left: 3px solid #5D23BB; padding-left: 15px;">
<p style="font-size: 16px; font-weight: 600; margin-top: 0; margin-bottom: 10px;">
Agent: <strong>{{ notification.data.agent_name }}</strong>
</p>
<div style="margin-left: 10px;">
<p style="margin: 5px 0;"><strong>Time:</strong> {{ notification.data.execution_time | int }} seconds</p>
<p style="margin: 5px 0;"><strong>Nodes:</strong> {{ notification.data.node_count }}</p>
<p style="margin: 5px 0;"><strong>Cost:</strong> ${{ "{:.2f}".format((notification.data.credits_used|float)/100) }}</p>
</div>
{% if notification.data.outputs and notification.data.outputs|length > 0 %}
<div style="margin-left: 10px; margin-top: 15px;">
<p style="font-weight: 600; margin-bottom: 10px;">Results:</p>
{% for output in notification.data.outputs %}
<div style="margin-left: 10px; margin-bottom: 12px;">
<p style="color: #5D23BB; font-weight: 500; margin-top: 0; margin-bottom: 5px;">
{{ output.name }}
</p>
{% for key, value in output.items() %}
{% if key != 'name' %}
<div style="margin-left: 10px; background-color: #f5f5ff; padding: 8px 12px; border-radius: 4px;
font-family: 'Roboto Mono', monospace; white-space: pre-wrap; word-break: break-word;
overflow-wrap: break-word; max-width: 100%; overflow-x: auto; margin-top: 3px;
margin-bottom: 8px; line-height: 1.4;">
{% if value is iterable and value is not string %}
{% if value|length == 1 %}
{{ value[0] }}
{% else %}
[{% for item in value %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}]
{% endif %}
{% else %}
{{ value }}
{% endif %}
</div>
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
{% endif %}
</div>
{% endfor %}
</div>
{% endfor %}
</div>
{% else %}
{# SINGLE NOTIFICATION MODE - Original template #}
<p style="font-family: 'Poppins', sans-serif; color: #070629; font-size: 16px; line-height: 165%;
margin-top: 0; margin-bottom: 10px;">
Your agent, <strong>{{ data.agent_name }}</strong>, has completed its run!
</p>
<p style="font-family: 'Poppins', sans-serif; color: #070629; font-size: 16px; line-height: 165%;
margin-top: 0; margin-bottom: 20px; padding-left: 20px;">
<p style="margin-bottom: 10px;"><strong>Time Taken:</strong> {{ data.execution_time | int }} seconds</p>
<p style="margin-bottom: 10px;"><strong>Nodes Used:</strong> {{ data.node_count }}</p>
<p style="margin-bottom: 10px;"><strong>Cost:</strong> ${{ "{:.2f}".format((data.credits_used|float)/100) }}</p>
</p>
{% if data.outputs and data.outputs|length > 0 %}
<div style="margin-left: 15px; margin-bottom: 20px;">
<p style="font-family: 'Poppins', sans-serif; color: #070629; font-weight: 600;
font-size: 16px; margin-bottom: 10px;">
Results:
</p>
{% for output in data.outputs %}
<div style="margin-left: 15px; margin-bottom: 15px;">
<p style="font-family: 'Poppins', sans-serif; color: #5D23BB; font-weight: 500;
font-size: 16px; margin-top: 0; margin-bottom: 8px;">
{{ output.name }}
</p>
{% for key, value in output.items() %}
{% if key != 'name' %}
<div style="margin-left: 15px; background-color: #f5f5ff; padding: 8px 12px; border-radius: 4px;
font-family: 'Roboto Mono', monospace; white-space: pre-wrap; word-break: break-word;
overflow-wrap: break-word; max-width: 100%; overflow-x: auto; margin-top: 5px;
margin-bottom: 10px; line-height: 1.4;">
{% if value is iterable and value is not string %}
{% if value|length == 1 %}
{{ value[0] }}
{% else %}
[{% for item in value %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}]
{% endif %}
{% else %}
{{ value }}
{% endif %}
</div>
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
{% endif %}
{% endif %}

View File

@@ -1,5 +1,5 @@
from backend.app import run_processes
from backend.executor import DatabaseManager, ExecutionScheduler
from backend.executor import DatabaseManager, Scheduler
from backend.notifications.notifications import NotificationManager
from backend.server.rest_api import AgentServer
@@ -11,7 +11,7 @@ def main():
run_processes(
NotificationManager(),
DatabaseManager(),
ExecutionScheduler(),
Scheduler(),
AgentServer(),
)

View File

@@ -55,7 +55,7 @@ from backend.data.user import (
update_user_email,
update_user_notification_preference,
)
from backend.executor import ExecutionManager, ExecutionScheduler, scheduler
from backend.executor import ExecutionManager, Scheduler, scheduler
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks.graph_lifecycle_hooks import (
on_graph_activate,
@@ -84,8 +84,8 @@ def execution_manager_client() -> ExecutionManager:
@thread_cached
def execution_scheduler_client() -> ExecutionScheduler:
return get_service_client(ExecutionScheduler)
def execution_scheduler_client() -> Scheduler:
return get_service_client(Scheduler)
settings = Settings()
@@ -701,7 +701,7 @@ class ScheduleCreationRequest(pydantic.BaseModel):
async def create_schedule(
user_id: Annotated[str, Depends(get_user_id)],
schedule: ScheduleCreationRequest,
) -> scheduler.JobInfo:
) -> scheduler.ExecutionJobInfo:
graph = await graph_db.get_graph(
schedule.graph_id, schedule.graph_version, user_id=user_id
)
@@ -743,7 +743,7 @@ def delete_schedule(
def get_execution_schedules(
user_id: Annotated[str, Depends(get_user_id)],
graph_id: str | None = None,
) -> list[scheduler.JobInfo]:
) -> list[scheduler.ExecutionJobInfo]:
return execution_scheduler_client().get_execution_schedules(
user_id=user_id,
graph_id=graph_id,

View File

@@ -105,10 +105,24 @@ async def list_library_agents(
logger.debug(
f"Retrieved {len(library_agents)} library agents for user #{user_id}"
)
# Only pass valid agents to the response
valid_library_agents: list[library_model.LibraryAgent] = []
for agent in library_agents:
try:
library_agent = library_model.LibraryAgent.from_db(agent)
valid_library_agents.append(library_agent)
except Exception as e:
# Skip this agent if there was an error
logger.error(
f"Error parsing LibraryAgent when getting library agents from db: {e}"
)
continue
# Return the response with only valid agents
return library_model.LibraryAgentResponse(
agents=[
library_model.LibraryAgent.from_db(agent) for agent in library_agents
],
agents=valid_library_agents,
pagination=backend.server.model.Pagination(
total_items=agent_count,
total_pages=(agent_count + page_size - 1) // page_size,

View File

@@ -2,9 +2,14 @@ import logging
from typing import Annotated
from autogpt_libs.auth.middleware import APIKeyValidator
from fastapi import APIRouter, Body, Depends
from fastapi import APIRouter, Body, Depends, Query
from fastapi.responses import JSONResponse
from backend.data.user import get_user_by_email, set_user_email_verification
from backend.data.user import (
get_user_by_email,
set_user_email_verification,
unsubscribe_user_by_token,
)
from backend.server.v2.postmark.models import (
PostmarkBounceEnum,
PostmarkBounceWebhook,
@@ -23,13 +28,24 @@ postmark_validator = APIKeyValidator(
settings.secrets.postmark_webhook_token,
)
router = APIRouter(dependencies=[Depends(postmark_validator.get_dependency())])
router = APIRouter()
logger = logging.getLogger(__name__)
@router.post("/")
@router.post("/unsubscribe")
async def unsubscribe_via_one_click(token: Annotated[str, Query()]):
logger.info(f"Received unsubscribe request from One Click Unsubscribe: {token}")
try:
await unsubscribe_user_by_token(token)
except Exception as e:
logger.error(f"Failed to unsubscribe user by token {token}: {e}")
raise e
return JSONResponse(status_code=200, content={"status": "ok"})
@router.post("/", dependencies=[Depends(postmark_validator.get_dependency())])
async def postmark_webhook_handler(
webhook: Annotated[
PostmarkWebhook,

View File

@@ -84,20 +84,30 @@ async def get_store_agents(
)
total_pages = (total + page_size - 1) // page_size
store_agents = [
backend.server.v2.store.model.StoreAgent(
slug=agent.slug,
agent_name=agent.agent_name,
agent_image=agent.agent_image[0] if agent.agent_image else "",
creator=agent.creator_username or "Needs Profile",
creator_avatar=agent.creator_avatar or "",
sub_heading=agent.sub_heading,
description=agent.description,
runs=agent.runs,
rating=agent.rating,
)
for agent in agents
]
store_agents: list[backend.server.v2.store.model.StoreAgent] = []
for agent in agents:
try:
# Create the StoreAgent object safely
store_agent = backend.server.v2.store.model.StoreAgent(
slug=agent.slug,
agent_name=agent.agent_name,
agent_image=agent.agent_image[0] if agent.agent_image else "",
creator=agent.creator_username or "Needs Profile",
creator_avatar=agent.creator_avatar or "",
sub_heading=agent.sub_heading,
description=agent.description,
runs=agent.runs,
rating=agent.rating,
)
# Add to the list only if creation was successful
store_agents.append(store_agent)
except Exception as e:
# Skip this agent if there was an error
# You could log the error here if needed
logger.error(
f"Error parsing Store agent when getting store agents from db: {e}"
)
continue
logger.debug(f"Found {len(store_agents)} agents")
return backend.server.v2.store.model.StoreAgentsResponse(

View File

@@ -56,6 +56,7 @@ config = Config()
api_host = config.pyro_host
api_comm_retry = config.pyro_client_comm_retry
api_comm_timeout = config.pyro_client_comm_timeout
api_call_timeout = config.rpc_client_call_timeout
pyro_config.MAX_RETRIES = api_comm_retry # type: ignore
pyro_config.COMMTIMEOUT = api_comm_timeout # type: ignore
@@ -264,7 +265,11 @@ class FastApiAppService(BaseAppService, ABC):
def _handle_internal_http_error(status_code: int = 500, log_error: bool = True):
def handler(request: Request, exc: Exception):
if log_error:
logger.exception(f"{request.method} {request.url.path} failed: {exc}")
if status_code == 500:
log = logger.exception
else:
log = logger.error
log(f"{request.method} {request.url.path} failed: {exc}")
return responses.JSONResponse(
status_code=status_code,
content=RemoteCallError(
@@ -429,7 +434,10 @@ def fastapi_close_service_client(client: Any) -> None:
@conn_retry("FastAPI client", "Creating service client", max_retry=api_comm_retry)
def fastapi_get_service_client(service_type: Type[AS]) -> AS:
def fastapi_get_service_client(
service_type: Type[AS],
call_timeout: int | None = api_call_timeout,
) -> AS:
class DynamicClient:
def __init__(self):
host = service_type.get_host()
@@ -437,7 +445,7 @@ def fastapi_get_service_client(service_type: Type[AS]) -> AS:
self.base_url = f"http://{host}:{port}".rstrip("/")
self.client = httpx.Client(
base_url=self.base_url,
timeout=api_comm_timeout,
timeout=call_timeout,
)
def _call_method(self, method_name: str, **kwargs) -> Any:

View File

@@ -81,6 +81,10 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
default=3,
description="The default number of retries for Pyro client connections.",
)
rpc_client_call_timeout: int = Field(
default=300,
description="The default timeout in seconds, for RPC client calls.",
)
enable_auth: bool = Field(
default=True,
description="If authentication is enabled or not",
@@ -321,6 +325,11 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
description="The token to use for the Postmark webhook",
)
unsubscribe_secret_key: str = Field(
default="",
description="The secret key to use for the unsubscribe user by token",
)
# OAuth server credentials for integrations
# --8<-- [start:OAuthServerCredentialsExample]
github_client_id: str = Field(default="", description="GitHub OAuth client ID")

View File

@@ -8,7 +8,7 @@ from backend.data.block import Block, BlockSchema, initialize_blocks
from backend.data.execution import ExecutionResult, ExecutionStatus
from backend.data.model import _BaseCredentials
from backend.data.user import create_default_user
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.executor import DatabaseManager, ExecutionManager, Scheduler
from backend.notifications.notifications import NotificationManager
from backend.server.rest_api import AgentServer
from backend.server.utils import get_user_id
@@ -21,7 +21,7 @@ class SpinTestServer:
self.db_api = DatabaseManager()
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = ExecutionScheduler()
self.scheduler = Scheduler()
self.notif_manager = NotificationManager()
@staticmethod

View File

@@ -0,0 +1,11 @@
-- DropIndex
DROP INDEX "APIKey_userId_idx";
-- DropIndex
DROP INDEX "StoreListing_agentId_owningUserId_idx";
-- DropIndex
DROP INDEX "StoreListing_isDeleted_idx";
-- DropIndex
DROP INDEX "StoreListingVersion_agentId_agentVersion_isDeleted_idx";

View File

@@ -1,4 +1,3 @@
// THIS FILE IS AUTO-GENERATED, RUN `poetry run schema` TO UPDATE
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
@@ -620,11 +619,9 @@ model StoreListing {
// Unique index on agentId to ensure only one listing per agent, regardless of number of versions the agent has.
@@unique([agentId])
@@index([agentId, owningUserId])
@@index([owningUserId])
// Used in the view query
@@index([isDeleted, isApproved])
@@index([isDeleted])
}
model StoreListingVersion {
@@ -665,7 +662,6 @@ model StoreListingVersion {
StoreListingReview StoreListingReview[]
@@unique([agentId, agentVersion])
@@index([agentId, agentVersion, isDeleted])
}
model StoreListingReview {
@@ -741,7 +737,6 @@ model APIKey {
@@index([key])
@@index([prefix])
@@index([userId])
@@index([status])
@@index([userId, status])
}

View File

@@ -1,7 +1,7 @@
import pytest
from backend.data import db
from backend.executor import ExecutionScheduler
from backend.executor import Scheduler
from backend.server.model import CreateGraph
from backend.usecases.sample import create_test_graph, create_test_user
from backend.util.service import get_service_client
@@ -17,7 +17,7 @@ async def test_agent_schedule(server: SpinTestServer):
user_id=test_user.id,
)
scheduler = get_service_client(ExecutionScheduler)
scheduler = get_service_client(Scheduler)
schedules = scheduler.get_execution_schedules(test_graph.id, test_user.id)
assert len(schedules) == 0

View File

@@ -92,6 +92,7 @@ services:
- FRONTEND_BASE_URL=http://localhost:3000
- BACKEND_CORS_ALLOW_ORIGINS=["http://localhost:3000"]
- ENCRYPTION_KEY=dvziYgz0KSK8FENhju0ZYi8-fRTfAdlz6YLhdB_jhNw= # DO NOT USE IN PRODUCTION!!
- UNSUBSCRIBE_SECRET_KEY=HlP8ivStJjmbf6NKi78m_3FnOogut0t5ckzjsIqeaio= # DO NOT USE IN PRODUCTION!!
ports:
- "8006:8006"
- "8007:8007"

View File

@@ -64,3 +64,79 @@ You can then run the migrations from the `backend` directory.
cd ../backend
prisma migrate dev --schema postgres/schema.prisma
```
## AutoGPT Agent Server Advanced set up
This guide walks you through a dockerized set up, with an external DB (postgres)
### Setup
We use the Poetry to manage the dependencies. To set up the project, follow these steps inside this directory:
0. Install Poetry
```sh
pip install poetry
```
1. Configure Poetry to use .venv in your project directory
```sh
poetry config virtualenvs.in-project true
```
2. Enter the poetry shell
```sh
poetry shell
```
3. Install dependencies
```sh
poetry install
```
4. Copy .env.example to .env
```sh
cp .env.example .env
```
5. Generate the Prisma client
```sh
poetry run prisma generate
```
> In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package:
>
> ```sh
> pip uninstall prisma
> ```
>
> Then run the generation again. The path *should* look something like this:
> `<some path>/pypoetry/virtualenvs/backend-TQIRSwR6-py3.12/bin/prisma`
6. Run the postgres database from the /rnd folder
```sh
cd autogpt_platform/
docker compose up -d
```
7. Run the migrations (from the backend folder)
```sh
cd ../backend
prisma migrate deploy
```
### Running The Server
#### Starting the server directly
Run the following command:
```sh
poetry run app
```

View File

@@ -23,7 +23,7 @@ To setup the server, you need to have the following installed:
- [Docker](https://docs.docker.com/get-docker/)
- [Git](https://git-scm.com/downloads)
#### Checking if you have Node.js & NPM installed
### Checking if you have Node.js & NPM installed
We use Node.js to run our frontend application.
@@ -42,7 +42,7 @@ npm -v
Once you have Node.js installed, you can proceed to the next step.
#### Checking if you have Docker & Docker Compose installed
### Checking if you have Docker & Docker Compose installed
Docker containerizes applications, while Docker Compose orchestrates multi-container Docker applications.
@@ -61,6 +61,8 @@ docker compose -v
Once you have Docker and Docker Compose installed, you can proceed to the next step.
## Setup
### Cloning the Repository
The first step is cloning the AutoGPT repository to your computer.
To do this, open a terminal window in a folder on your computer and run:
@@ -129,7 +131,7 @@ Frontend UI Server: 3000
Backend Websocket Server: 8001
Execution API Rest Server: 8006
#### Additional Notes
### Additional Notes
You may want to change your encryption key in the `.env` file in the `autogpt_platform/backend` directory.
@@ -146,3 +148,214 @@ poetry run cli gen-encrypt-key
```
Then, replace the existing key in the `autogpt_platform/backend/.env` file with the new one.
!!! Note
*The steps below are an alternative to [Running the backend services](#running-the-backend-services)*
<details>
<summary><strong>Alternate Steps</strong></summary>
#### AutoGPT Agent Server (OLD)
This is an initial project for creating the next generation of agent execution, which is an AutoGPT agent server.
The agent server will enable the creation of composite multi-agent systems that utilize AutoGPT agents and other non-agent components as its primitives.
##### Docs
You can access the docs for the [AutoGPT Agent Server here](https://docs.agpt.co/#1-autogpt-server).
##### Setup
We use the Poetry to manage the dependencies. To set up the project, follow these steps inside this directory:
0. Install Poetry
```sh
pip install poetry
```
1. Configure Poetry to use .venv in your project directory
```sh
poetry config virtualenvs.in-project true
```
2. Enter the poetry shell
```sh
poetry shell
```
3. Install dependencies
```sh
poetry install
```
4. Copy .env.example to .env
```sh
cp .env.example .env
```
5. Generate the Prisma client
```sh
poetry run prisma generate
```
> In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package:
>
> ```sh
> pip uninstall prisma
> ```
>
> Then run the generation again. The path *should* look something like this:
> `<some path>/pypoetry/virtualenvs/backend-TQIRSwR6-py3.12/bin/prisma`
6. Migrate the database. Be careful because this deletes current data in the database.
```sh
docker compose up db -d
poetry run prisma migrate deploy
```
</details>
### Starting the AutoGPT server without Docker
To run the server locally, start in the autogpt_platform folder:
```sh
cd ..
```
Run the following command to run database in docker but the application locally:
```sh
docker compose --profile local up deps --build --detach
cd backend
poetry run app
```
### Starting the AutoGPT server with Docker
Run the following command to build the dockerfiles:
```sh
docker compose build
```
Run the following command to run the app:
```sh
docker compose up
```
Run the following to automatically rebuild when code changes, in another terminal:
```sh
docker compose watch
```
Run the following command to shut down:
```sh
docker compose down
```
If you run into issues with dangling orphans, try:
```sh
docker compose down --volumes --remove-orphans && docker-compose up --force-recreate --renew-anon-volumes --remove-orphans
```
## Development
### Formatting & Linting
Auto formatter and linter are set up in the project. To run them:
Install:
```sh
poetry install --with dev
```
Format the code:
```sh
poetry run format
```
Lint the code:
```sh
poetry run lint
```
### Testing
To run the tests:
```sh
poetry run test
```
## Project Outline
The current project has the following main modules:
#### **blocks**
This module stores all the Agent Blocks, which are reusable components to build a graph that represents the agent's behavior.
#### **data**
This module stores the logical model that is persisted in the database.
It abstracts the database operations into functions that can be called by the service layer.
Any code that interacts with Prisma objects or the database should reside in this module.
The main models are:
* `block`: anything related to the block used in the graph
* `execution`: anything related to the execution graph execution
* `graph`: anything related to the graph, node, and its relations
#### **execution**
This module stores the business logic of executing the graph.
It currently has the following main modules:
* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both pieces of logic.
* `scheduler`: A service that triggers scheduled graph execution based on a cron expression. It pushes an execution request to the manager.
#### **server**
This module stores the logic for the server API.
It contains all the logic used for the API that allows the client to create, execute, and monitor the graph and its execution.
This API service interacts with other services like those defined in `manager` and `scheduler`.
#### **utils**
This module stores utility functions that are used across the project.
Currently, it has two main modules:
* `process`: A module that contains the logic to spawn a new process.
* `service`: A module that serves as a parent class for all the services in the project.
## Service Communication
Currently, there are only 3 active services:
- AgentServer (the API, defined in `server.py`)
- ExecutionManager (the executor, defined in `manager.py`)
- ExecutionScheduler (the scheduler, defined in `scheduler.py`)
The services run in independent Python processes and communicate through an IPC.
A communication layer (`service.py`) is created to decouple the communication library from the implementation.
Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process.
## Adding a New Agent Block
To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information:
* All the block code should live in the `blocks` (`backend.blocks`) module.
* `input_schema`: the schema of the input data, represented by a Pydantic object.
* `output_schema`: the schema of the output data, represented by a Pydantic object.
* `run` method: the main logic of the block.
* `test_input` & `test_output`: the sample input and output data for the block, which will be used to auto-test the block.
* You can mock the functions declared in the block using the `test_mock` field for your unit tests.
* Once you finish creating the block, you can test it by running `poetry run pytest -s test/block/test_block.py`.