Merge branch 'ntindle/secrt-1077-add-email-service' into andrewhooker2/secrt-1077-add-email-service-settings-page

This commit is contained in:
Nicholas Tindle
2025-02-13 07:37:36 -06:00
committed by GitHub
18 changed files with 281 additions and 220 deletions

View File

@@ -42,6 +42,14 @@ jobs:
REDIS_PASSWORD: testpassword
ports:
- 6379:6379
rabbitmq:
image: rabbitmq:3.12-management
ports:
- 5672:5672
- 15672:15672
env:
RABBITMQ_DEFAULT_USER: ${{ env.RABBITMQ_DEFAULT_USER }}
RABBITMQ_DEFAULT_PASS: ${{ env.RABBITMQ_DEFAULT_PASS }}
steps:
- name: Checkout repository
@@ -129,9 +137,9 @@ jobs:
SUPABASE_URL: ${{ steps.supabase.outputs.API_URL }}
SUPABASE_SERVICE_ROLE_KEY: ${{ steps.supabase.outputs.SERVICE_ROLE_KEY }}
SUPABASE_JWT_SECRET: ${{ steps.supabase.outputs.JWT_SECRET }}
REDIS_HOST: 'localhost'
REDIS_PORT: '6379'
REDIS_PASSWORD: 'testpassword'
REDIS_HOST: "localhost"
REDIS_PORT: "6379"
REDIS_PASSWORD: "testpassword"
env:
CI: true
@@ -139,6 +147,13 @@ jobs:
RUN_ENV: local
PORT: 8080
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
# We know these are here, don't report this as a security vulnerability
# This is used as the default credential for the entire system's RabbitMQ instance
# If you want to replace this, you can do so by making our entire system generate
# new credentials for each local user and update the environment variables in
# the backend service, docker composes, and examples
RABBITMQ_DEFAULT_USER: rabbitmq_user_default
RABBITMQ_DEFAULT_PASS: k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7
# - name: Upload coverage reports to Codecov
# uses: codecov/codecov-action@v4

View File

@@ -22,7 +22,7 @@ To run the AutoGPT Platform, follow these steps:
2. Run the following command:
```
git submodule update --init --recursive
git submodule update --init --recursive --progress
```
This command will initialize and update the submodules in the repository. The `supabase` folder will be cloned to the root directory.

View File

@@ -76,6 +76,8 @@ class ExtractTextInformationBlock(Block):
class Output(BlockSchema):
positive: str = SchemaField(description="Extracted text")
negative: str = SchemaField(description="Original text")
matched_results: list[str] = SchemaField(description="List of matched results")
matched_count: int = SchemaField(description="Number of matched results")
def __init__(self):
super().__init__(
@@ -103,13 +105,31 @@ class ExtractTextInformationBlock(Block):
},
],
test_output=[
# Test case 1
("positive", "World!"),
("matched_results", ["World!"]),
("matched_count", 1),
# Test case 2
("positive", "Hello, World!"),
("matched_results", ["Hello, World!"]),
("matched_count", 1),
# Test case 3
("negative", "Hello, World!"),
("matched_results", []),
("matched_count", 0),
# Test case 4
("positive", "Hello,"),
("matched_results", ["Hello,"]),
("matched_count", 1),
# Test case 5
("positive", "World!!"),
("matched_results", ["World!!"]),
("matched_count", 1),
# Test case 6
("positive", "World!!"),
("positive", "Earth!!"),
("matched_results", ["World!!", "Earth!!"]),
("matched_count", 2),
],
)
@@ -130,13 +150,16 @@ class ExtractTextInformationBlock(Block):
for match in re.finditer(input_data.pattern, txt, flags)
if input_data.group <= len(match.groups())
]
if not input_data.find_all:
matches = matches[:1]
for match in matches:
yield "positive", match
if not input_data.find_all:
return
if not matches:
yield "negative", input_data.text
yield "matched_results", matches
yield "matched_count", len(matches)
class FillTextTemplateBlock(Block):
class Input(BlockSchema):

View File

@@ -1,8 +1,9 @@
from collections import defaultdict
from datetime import datetime, timezone
from multiprocessing import Manager
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar
from prisma import Json
from prisma.enums import AgentExecutionStatus
from prisma.errors import PrismaError
from prisma.models import (
@@ -16,7 +17,7 @@ from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import json, mock
from backend.util import mock, type
from backend.util.settings import Config
@@ -102,16 +103,16 @@ class ExecutionResult(BaseModel):
def from_db(execution: AgentNodeExecution):
if execution.executionData:
# Execution that has been queued for execution will persist its data.
input_data = json.loads(execution.executionData, target_type=dict[str, Any])
input_data = type.convert(execution.executionData, dict[str, Any])
else:
# For incomplete execution, executionData will not be yet available.
input_data: BlockInput = defaultdict()
for data in execution.Input or []:
input_data[data.name] = json.loads(data.data)
input_data[data.name] = type.convert(data.data, Type[Any])
output_data: CompletedBlockOutput = defaultdict(list)
for data in execution.Output or []:
output_data[data.name].append(json.loads(data.data))
output_data[data.name].append(type.convert(data.data, Type[Any]))
graph_execution: AgentGraphExecution | None = execution.AgentGraphExecution
@@ -158,7 +159,7 @@ async def create_graph_execution(
"executionStatus": ExecutionStatus.INCOMPLETE,
"Input": {
"create": [
{"name": name, "data": json.dumps(data)}
{"name": name, "data": Json(data)}
for name, data in node_input.items()
]
},
@@ -210,7 +211,7 @@ async def upsert_execution_input(
order={"addedTime": "asc"},
include={"Input": True},
)
json_input_data = json.dumps(input_data)
json_input_data = Json(input_data)
if existing_execution:
await AgentNodeExecutionInputOutput.prisma().create(
@@ -222,7 +223,7 @@ async def upsert_execution_input(
)
return existing_execution.id, {
**{
input_data.name: json.loads(input_data.data)
input_data.name: type.convert(input_data.data, Type[Any])
for input_data in existing_execution.Input or []
},
input_name: input_data,
@@ -256,7 +257,7 @@ async def upsert_execution_output(
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": output_name,
"data": json.dumps(output_data),
"data": Json(output_data),
"referencedByOutputExecId": node_exec_id,
}
)
@@ -281,7 +282,7 @@ async def update_graph_execution_stats(
where={"id": graph_exec_id},
data={
"executionStatus": status,
"stats": json.dumps(stats),
"stats": Json(stats),
},
)
if not res:
@@ -293,7 +294,7 @@ async def update_graph_execution_stats(
async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
await AgentNodeExecution.prisma().update(
where={"id": node_exec_id},
data={"stats": json.dumps(stats)},
data={"stats": Json(stats)},
)
@@ -313,8 +314,8 @@ async def update_execution_status(
**({"startedTime": now} if status == ExecutionStatus.RUNNING else {}),
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
**({"stats": json.dumps(stats)} if stats else {}),
**({"executionData": Json(execution_data)} if execution_data else {}),
**({"stats": Json(stats)} if stats else {}),
}
res = await AgentNodeExecution.prisma().update(
@@ -473,8 +474,7 @@ async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult
where={
"agentNodeId": node_id,
"agentGraphExecutionId": graph_eid,
"executionStatus": {"not": ExecutionStatus.INCOMPLETE},
"executionData": {"not": None}, # type: ignore
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
},
order={"queuedTime": "desc"},
include=EXECUTION_RESULT_INCLUDE,

View File

@@ -6,6 +6,7 @@ from datetime import datetime, timezone
from typing import Any, Literal, Optional, Type
import prisma
from prisma import Json
from prisma.models import (
AgentGraph,
AgentGraphExecution,
@@ -18,7 +19,7 @@ from pydantic.fields import computed_field
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
from backend.util import json
from backend.util import type
from .block import BlockInput, BlockType, get_block, get_blocks
from .db import BaseDbModel, transaction
@@ -74,8 +75,8 @@ class NodeModel(Node):
obj = NodeModel(
id=node.id,
block_id=node.AgentBlock.id,
input_default=json.loads(node.constantInput, target_type=dict[str, Any]),
metadata=json.loads(node.metadata, target_type=dict[str, Any]),
input_default=type.convert(node.constantInput, dict[str, Any]),
metadata=type.convert(node.metadata, dict[str, Any]),
graph_id=node.agentGraphId,
graph_version=node.agentGraphVersion,
webhook_id=node.webhookId,
@@ -125,7 +126,7 @@ class GraphExecution(BaseDbModel):
total_run_time = duration
try:
stats = json.loads(execution.stats or "{}", target_type=dict[str, Any])
stats = type.convert(execution.stats or {}, dict[str, Any])
except ValueError:
stats = {}
@@ -402,11 +403,9 @@ class GraphModel(Graph):
if for_export:
# Remove credentials from node input
if node.constantInput:
constant_input = json.loads(
node.constantInput, target_type=dict[str, Any]
)
constant_input = type.convert(node.constantInput, dict[str, Any])
constant_input = GraphModel._hide_node_input_credentials(constant_input)
node.constantInput = json.dumps(constant_input)
node.constantInput = Json(constant_input)
# Remove webhook info
node.webhookId = None
@@ -654,8 +653,8 @@ async def __create_graph(tx, graph: Graph, user_id: str):
{
"id": node.id,
"agentBlockId": node.block_id,
"constantInput": json.dumps(node.input_default),
"metadata": json.dumps(node.metadata),
"constantInput": Json(node.input_default),
"metadata": Json(node.metadata),
}
for node in graph.nodes
]
@@ -717,9 +716,11 @@ async def fix_llm_provider_credentials():
store = IntegrationCredentialsStore()
broken_nodes = await prisma.get_client().query_raw(
"""
SELECT graph."userId" user_id,
broken_nodes = []
try:
broken_nodes = await prisma.get_client().query_raw(
"""
SELECT graph."userId" user_id,
node.id node_id,
node."constantInput" node_preset_input
FROM platform."AgentNode" node
@@ -728,8 +729,10 @@ async def fix_llm_provider_credentials():
WHERE node."constantInput"::jsonb->'credentials'->>'provider' = 'llm'
ORDER BY graph."userId";
"""
)
logger.info(f"Fixing LLM credential inputs on {len(broken_nodes)} nodes")
)
logger.info(f"Fixing LLM credential inputs on {len(broken_nodes)} nodes")
except Exception as e:
logger.error(f"Error fixing LLM credential inputs: {e}")
user_id: str = ""
user_integrations = None
@@ -742,7 +745,7 @@ async def fix_llm_provider_credentials():
raise RuntimeError(f"Impossible state while processing node {node}")
node_id: str = node["node_id"]
node_preset_input: dict = json.loads(node["node_preset_input"])
node_preset_input: dict = node["node_preset_input"]
credentials_meta: dict = node_preset_input["credentials"]
credentials = next(
@@ -778,5 +781,5 @@ async def fix_llm_provider_credentials():
store.update_creds(user_id, credentials)
await AgentNode.prisma().update(
where={"id": node_id},
data={"constantInput": json.dumps(node_preset_input)},
data={"constantInput": Json(node_preset_input)},
)

View File

@@ -34,13 +34,9 @@ async def get_or_create_user(user_data: dict) -> User:
"id": user_id,
"email": user_email,
"name": user_data.get("user_metadata", {}).get("name"),
"UserNotificationPreference": {"create": {"userId": user_id}},
}
)
if not user.userNotificationPreferenceId:
user.UserNotificationPreference = (
await prisma.usernotificationpreference.create(data={"userId": user_id})
)
return User.model_validate(user)
except Exception as e:
raise DatabaseError(f"Failed to get or create user {user_data}: {e}") from e
@@ -53,6 +49,14 @@ async def get_user_by_id(user_id: str) -> User:
return User.model_validate(user)
async def get_user_email_by_id(user_id: str) -> str:
try:
user = await prisma.user.find_unique_or_raise(where={"id": user_id})
return user.email
except Exception as e:
raise DatabaseError(f"Failed to get user email for user {user_id}: {e}") from e
async def create_default_user() -> Optional[User]:
user = await prisma.user.find_unique(where={"id": DEFAULT_USER_ID})
if not user:
@@ -178,59 +182,22 @@ async def get_user_notification_preference(user_id: str) -> NotificationPreferen
try:
user = await User.prisma().find_unique_or_raise(
where={"id": user_id},
include={
"UserNotificationPreference": True,
},
)
# enable notifications by default if user has no notification preference (shouldn't ever happen though)
preferences: dict[NotificationType, bool] = {
NotificationType.AGENT_RUN: (
user.UserNotificationPreference.notifyOnAgentRun
if user.UserNotificationPreference
else True
),
NotificationType.ZERO_BALANCE: (
user.UserNotificationPreference.notifyOnZeroBalance
if user.UserNotificationPreference
else True
),
NotificationType.LOW_BALANCE: (
user.UserNotificationPreference.notifyOnLowBalance
if user.UserNotificationPreference
else True
),
NotificationType.BLOCK_EXECUTION_FAILED: (
user.UserNotificationPreference.notifyOnBlockExecutionFailed
if user.UserNotificationPreference
else True
),
NotificationType.CONTINUOUS_AGENT_ERROR: (
user.UserNotificationPreference.notifyOnContinuousAgentError
if user.UserNotificationPreference
else True
),
NotificationType.DAILY_SUMMARY: (
user.UserNotificationPreference.notifyOnDailySummary
if user.UserNotificationPreference
else True
),
NotificationType.WEEKLY_SUMMARY: (
user.UserNotificationPreference.notifyOnWeeklySummary
if user.UserNotificationPreference
else True
),
NotificationType.MONTHLY_SUMMARY: (
user.UserNotificationPreference.notifyOnMonthlySummary
if user.UserNotificationPreference
else True
),
NotificationType.AGENT_RUN: user.notifyOnAgentRun or True,
NotificationType.ZERO_BALANCE: user.notifyOnZeroBalance or True,
NotificationType.LOW_BALANCE: user.notifyOnLowBalance or True,
NotificationType.BLOCK_EXECUTION_FAILED: user.notifyOnBlockExecutionFailed
or True,
NotificationType.CONTINUOUS_AGENT_ERROR: user.notifyOnContinuousAgentError
or True,
NotificationType.DAILY_SUMMARY: user.notifyOnDailySummary or True,
NotificationType.WEEKLY_SUMMARY: user.notifyOnWeeklySummary or True,
NotificationType.MONTHLY_SUMMARY: user.notifyOnMonthlySummary or True,
}
daily_limit = (
user.UserNotificationPreference.maxEmailsPerDay
if user.UserNotificationPreference
else 3
)
daily_limit = user.maxEmailsPerDay or 3
notification_preference = NotificationPreference(
user_id=user.id,
email=user.email,

View File

@@ -28,6 +28,7 @@ from backend.data.user import (
get_active_user_ids_in_timerange,
get_active_users_ids,
get_user_by_id,
get_user_email_by_id,
get_user_integrations,
get_user_metadata,
get_user_notification_preference,
@@ -105,6 +106,7 @@ class DatabaseManager(AppService):
get_active_user_ids_in_timerange
)
get_user_by_id = exposed_run_and_wait(get_user_by_id)
get_user_email_by_id = exposed_run_and_wait(get_user_email_by_id)
get_user_notification_preference = exposed_run_and_wait(
get_user_notification_preference
)

View File

@@ -1,7 +1,6 @@
import logging
import pathlib
from backend.util.text import TextFormatter
from postmarker.core import PostmarkClient
from postmarker.models.emails import EmailManager
from prisma.enums import NotificationType
@@ -12,6 +11,7 @@ from backend.data.notifications import (
T_co,
)
from backend.util.settings import Settings
from backend.util.text import TextFormatter
logger = logging.getLogger(__name__)
settings = Settings()
@@ -26,9 +26,14 @@ class TypedPostmarkClient(PostmarkClient):
class EmailSender:
def __init__(self):
self.postmark = TypedPostmarkClient(
server_token=settings.secrets.postmark_server_api_token
)
if settings.secrets.postmark_server_api_token:
self.postmark = TypedPostmarkClient(
server_token=settings.secrets.postmark_server_api_token
)
else:
logger.warning(
"Postmark server API token not found, email sending disabled"
)
self.formatter = TextFormatter()
def send_templated(
@@ -37,6 +42,9 @@ class EmailSender:
user_email: str,
data: NotificationEventModel[T_co] | list[NotificationEventModel[T_co]],
):
if not self.postmark:
logger.warning("Postmark client not initialized, email not sent")
return
body = self._get_template(notification)
# use the jinja2 library to render the template
body = self.formatter.format_string(body, data)

View File

@@ -302,8 +302,7 @@ class NotificationManager(AppService):
parsed_event = NotificationEventModel[
get_data_type(event.type)
].model_validate_json(message)
# Implementation of actual notification sending would go here
user_email = get_db_client().get_user_by_id(event.user_id).email
user_email = get_db_client().get_user_email_by_id(event.user_id)
should_send = (
get_db_client()
.get_user_notification_preference(event.user_id)

View File

@@ -9,6 +9,7 @@ from backend.data.execution import ExecutionResult, ExecutionStatus
from backend.data.model import _BaseCredentials
from backend.data.user import create_default_user
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.notifications.notifications import NotificationManager
from backend.server.rest_api import AgentServer
from backend.server.utils import get_user_id
@@ -21,6 +22,7 @@ class SpinTestServer:
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = ExecutionScheduler()
self.notifications = NotificationManager()
@staticmethod
def test_get_user_id():
@@ -32,6 +34,7 @@ class SpinTestServer:
self.agent_server.__enter__()
self.exec_manager.__enter__()
self.scheduler.__enter__()
self.notifications.__enter__()
await db.connect()
await initialize_blocks()
@@ -46,6 +49,7 @@ class SpinTestServer:
self.exec_manager.__exit__(exc_type, exc_val, exc_tb)
self.agent_server.__exit__(exc_type, exc_val, exc_tb)
self.db_api.__exit__(exc_type, exc_val, exc_tb)
self.notifications.__exit__(exc_type, exc_val, exc_tb)
def setup_dependency_overrides(self):
# Override get_user_id for testing

View File

@@ -1,6 +1,8 @@
import json
from typing import Any, Type, TypeVar, cast, get_args, get_origin
from prisma import Json as PrismaJson
class ConversionError(ValueError):
pass
@@ -188,6 +190,8 @@ def type_match(value: Any, target_type: Type[T]) -> T:
def convert(value: Any, target_type: Type[T]) -> T:
try:
if isinstance(value, PrismaJson):
value = value.data
return cast(T, _try_convert(value, target_type, raise_on_mismatch=False))
except Exception as e:
raise ConversionError(f"Failed to convert {value} to {target_type}") from e

View File

@@ -1,71 +0,0 @@
/*
Warnings:
- A unique constraint covering the columns `[userNotificationPreferenceId]` on the table `User` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateEnum
CREATE TYPE "NotificationType" AS ENUM ('AGENT_RUN', 'ZERO_BALANCE', 'LOW_BALANCE', 'BLOCK_EXECUTION_FAILED', 'CONTINUOUS_AGENT_ERROR', 'DAILY_SUMMARY', 'WEEKLY_SUMMARY', 'MONTHLY_SUMMARY');
-- AlterTable
ALTER TABLE "User" ADD COLUMN "userNotificationPreferenceId" TEXT;
-- CreateTable
CREATE TABLE "NotificationEvent" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userNotificationBatchId" TEXT,
"type" "NotificationType" NOT NULL,
"data" JSONB NOT NULL,
CONSTRAINT "NotificationEvent_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "UserNotificationBatch" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userId" TEXT NOT NULL,
"type" "NotificationType" NOT NULL,
CONSTRAINT "UserNotificationBatch_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "UserNotificationPreference" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userId" TEXT NOT NULL,
"maxEmailsPerDay" INTEGER NOT NULL DEFAULT 3,
"notifyOnAgentRun" BOOLEAN NOT NULL DEFAULT true,
"notifyOnZeroBalance" BOOLEAN NOT NULL DEFAULT true,
"notifyOnLowBalance" BOOLEAN NOT NULL DEFAULT true,
"notifyOnBlockExecutionFailed" BOOLEAN NOT NULL DEFAULT true,
"notifyOnContinuousAgentError" BOOLEAN NOT NULL DEFAULT true,
"notifyOnDailySummary" BOOLEAN NOT NULL DEFAULT true,
"notifyOnWeeklySummary" BOOLEAN NOT NULL DEFAULT true,
"notifyOnMonthlySummary" BOOLEAN NOT NULL DEFAULT true,
CONSTRAINT "UserNotificationPreference_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "UserNotificationBatch_userId_type_key" ON "UserNotificationBatch"("userId", "type");
-- CreateIndex
CREATE UNIQUE INDEX "UserNotificationPreference_userId_key" ON "UserNotificationPreference"("userId");
-- CreateIndex
CREATE UNIQUE INDEX "User_userNotificationPreferenceId_key" ON "User"("userNotificationPreferenceId");
-- AddForeignKey
ALTER TABLE "User" ADD CONSTRAINT "User_userNotificationPreferenceId_fkey" FOREIGN KEY ("userNotificationPreferenceId") REFERENCES "UserNotificationPreference"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "NotificationEvent" ADD CONSTRAINT "NotificationEvent_userNotificationBatchId_fkey" FOREIGN KEY ("userNotificationBatchId") REFERENCES "UserNotificationBatch"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "UserNotificationBatch" ADD CONSTRAINT "UserNotificationBatch_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -0,0 +1,45 @@
-- CreateEnum
CREATE TYPE "NotificationType" AS ENUM ('AGENT_RUN', 'ZERO_BALANCE', 'LOW_BALANCE', 'BLOCK_EXECUTION_FAILED', 'CONTINUOUS_AGENT_ERROR', 'DAILY_SUMMARY', 'WEEKLY_SUMMARY', 'MONTHLY_SUMMARY');
-- AlterTable
ALTER TABLE "User" ADD COLUMN "maxEmailsPerDay" INTEGER NOT NULL DEFAULT 3,
ADD COLUMN "notifyOnAgentRun" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnBlockExecutionFailed" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnContinuousAgentError" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnDailySummary" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnLowBalance" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnMonthlySummary" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnWeeklySummary" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "notifyOnZeroBalance" BOOLEAN NOT NULL DEFAULT true;
-- CreateTable
CREATE TABLE "NotificationEvent" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userNotificationBatchId" TEXT,
"type" "NotificationType" NOT NULL,
"data" JSONB NOT NULL,
CONSTRAINT "NotificationEvent_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "UserNotificationBatch" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"userId" TEXT NOT NULL,
"type" "NotificationType" NOT NULL,
CONSTRAINT "UserNotificationBatch_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "UserNotificationBatch_userId_type_key" ON "UserNotificationBatch"("userId", "type");
-- AddForeignKey
ALTER TABLE "NotificationEvent" ADD CONSTRAINT "NotificationEvent_userNotificationBatchId_fkey" FOREIGN KEY ("userNotificationBatchId") REFERENCES "UserNotificationBatch"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "UserNotificationBatch" ADD CONSTRAINT "UserNotificationBatch_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -0,0 +1,77 @@
CREATE OR REPLACE FUNCTION migrate_text_column_to_json(
p_table text, -- Table name, e.g. 'AgentNodeExecution'
p_col text, -- Column name to convert, e.g. 'executionData'
p_default json DEFAULT '{}'::json, -- Fallback value when original value is NULL.
-- Pass NULL here if you prefer to leave NULLs.
p_set_nullable boolean DEFAULT true -- If false, the new column will be NOT NULL.
) RETURNS void AS $$
DECLARE
full_table text;
tmp_col text;
BEGIN
-- Build a fully qualified table name using the current schema.
full_table := format('%I.%I', current_schema(), p_table);
tmp_col := p_col || '_tmp';
-- 0. Skip the migration if the column is already of type jsonb.
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = p_table
AND column_name = p_col
AND data_type = 'jsonb'
) THEN
RAISE NOTICE 'Column %I.%I is already of type jsonb, skipping migration.', full_table, p_col;
RETURN;
END IF;
-- 1. Cleanup the original column from invalid JSON characters.
EXECUTE format('UPDATE %s SET %I = replace(%I, E''\\u0000'', '''') WHERE %I LIKE ''%%\\u0000%%'';', full_table, p_col, p_col, p_col);
-- 2. Add the temporary column of type JSON.
EXECUTE format('ALTER TABLE %s ADD COLUMN %I jsonb;', full_table, tmp_col);
-- 3. Convert the data:
-- - If p_default IS NOT NULL, use it as the fallback value.
-- - Otherwise, keep NULL.
IF p_default IS NULL THEN
EXECUTE format(
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN NULL ELSE %I::json END;',
full_table, tmp_col, p_col, p_col
);
ELSE
EXECUTE format(
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN %L::json ELSE %I::json END;',
full_table, tmp_col, p_col, p_default::text, p_col
);
END IF;
-- 4. Drop the original text column.
EXECUTE format('ALTER TABLE %s DROP COLUMN %I;', full_table, p_col);
-- 5. Rename the temporary column to the original column name.
EXECUTE format('ALTER TABLE %s RENAME COLUMN %I TO %I;', full_table, tmp_col, p_col);
-- 6. Optionally set a DEFAULT for future inserts if a fallback is provided.
IF p_default IS NOT NULL THEN
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET DEFAULT %L::json;',
full_table, p_col, p_default::text);
END IF;
-- 7. Optionally mark the column as NOT NULL.
IF NOT p_set_nullable THEN
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET NOT NULL;', full_table, p_col);
END IF;
END;
$$ LANGUAGE plpgsql;
BEGIN;
SELECT migrate_text_column_to_json('AgentGraphExecution', 'stats', NULL, true);
SELECT migrate_text_column_to_json('AgentNodeExecution', 'stats', NULL, true);
SELECT migrate_text_column_to_json('AgentNodeExecution', 'executionData', NULL, true);
SELECT migrate_text_column_to_json('AgentNode', 'constantInput', '{}'::json, false);
SELECT migrate_text_column_to_json('AgentNode', 'metadata', '{}'::json, false);
SELECT migrate_text_column_to_json('AgentNodeExecutionInputOutput', 'data', NULL, false);
COMMIT;

View File

@@ -13,16 +13,25 @@ generator client {
// User model to mirror Auth provider users
model User {
id String @id // This should match the Supabase user ID
email String @unique
name String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
metadata Json @default("{}")
integrations String @default("")
stripeCustomerId String?
topUpConfig Json?
userNotificationPreferenceId String? @unique
id String @id // This should match the Supabase user ID
email String @unique
name String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
metadata Json @default("{}")
integrations String @default("")
stripeCustomerId String?
topUpConfig Json?
maxEmailsPerDay Int @default(3)
notifyOnAgentRun Boolean @default(true)
notifyOnZeroBalance Boolean @default(true)
notifyOnLowBalance Boolean @default(true)
notifyOnBlockExecutionFailed Boolean @default(true)
notifyOnContinuousAgentError Boolean @default(true)
notifyOnDailySummary Boolean @default(true)
notifyOnWeeklySummary Boolean @default(true)
notifyOnMonthlySummary Boolean @default(true)
// Relations
@@ -35,14 +44,13 @@ model User {
AgentPreset AgentPreset[]
UserAgent UserAgent[]
Profile Profile[]
StoreListing StoreListing[]
StoreListingReview StoreListingReview[]
StoreListingSubmission StoreListingSubmission[]
APIKeys APIKey[]
IntegrationWebhooks IntegrationWebhook[]
UserNotificationBatch UserNotificationBatch[]
UserNotificationPreference UserNotificationPreference? @relation(fields: [userNotificationPreferenceId], references: [id], onDelete: Cascade)
Profile Profile[]
StoreListing StoreListing[]
StoreListingReview StoreListingReview[]
StoreListingSubmission StoreListingSubmission[]
APIKeys APIKey[]
IntegrationWebhooks IntegrationWebhook[]
UserNotificationBatch UserNotificationBatch[]
@@index([id])
@@index([email])
@@ -153,26 +161,6 @@ model UserNotificationBatch {
@@unique([userId, type])
}
model UserNotificationPreference {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
userId String @unique // Add @unique here
User User?
maxEmailsPerDay Int @default(3)
notifyOnAgentRun Boolean @default(true)
notifyOnZeroBalance Boolean @default(true)
notifyOnLowBalance Boolean @default(true)
notifyOnBlockExecutionFailed Boolean @default(true)
notifyOnContinuousAgentError Boolean @default(true)
notifyOnDailySummary Boolean @default(true)
notifyOnWeeklySummary Boolean @default(true)
notifyOnMonthlySummary Boolean @default(true)
}
// For the library page
// It is a user controlled list of agents, that they will see in there library
model UserAgent {
@@ -221,15 +209,13 @@ model AgentNode {
// List of produced output, that the child node should be executed.
Output AgentNodeLink[] @relation("AgentNodeSource")
// JSON serialized dict[str, str] containing predefined input values.
constantInput String @default("{}")
constantInput Json @default("{}")
// For webhook-triggered blocks: reference to the webhook that triggers the node
webhookId String?
Webhook IntegrationWebhook? @relation(fields: [webhookId], references: [id])
// JSON serialized dict[str, str] containing the node metadata.
metadata String @default("{}")
metadata Json @default("{}")
ExecutionHistory AgentNodeExecution[]
@@ -302,7 +288,7 @@ model AgentGraphExecution {
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
stats String? // JSON serialized object
stats Json?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
agentPresetId String?
@@ -324,14 +310,13 @@ model AgentNodeExecution {
Output AgentNodeExecutionInputOutput[] @relation("AgentNodeExecutionOutput")
executionStatus AgentExecutionStatus @default(COMPLETED)
// Final JSON serialized input data for the node execution.
executionData String?
executionData Json?
addedTime DateTime @default(now())
queuedTime DateTime?
startedTime DateTime?
endedTime DateTime?
stats String? // JSON serialized object
stats Json?
@@index([agentGraphExecutionId])
@@index([agentNodeId])
@@ -342,7 +327,7 @@ model AgentNodeExecutionInputOutput {
id String @id @default(uuid())
name String
data String
data Json
time DateTime @default(now())
// Prisma requires explicit back-references.

View File

@@ -4,7 +4,7 @@ from datetime import datetime
import prisma.enums
from faker import Faker
from prisma import Prisma
from prisma import Json, Prisma
faker = Faker()
@@ -110,8 +110,8 @@ async def main():
"agentBlockId": block.id,
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"constantInput": "{}",
"metadata": "{}",
"constantInput": Json({}),
"metadata": Json({}),
}
)
agent_nodes.append(node)

View File

@@ -263,7 +263,7 @@ export const PublishAgentPopout: React.FC<PublishAgentPopoutProps> = ({
onClose={handleClose}
onDone={handleClose}
onViewProgress={() => {
router.push("/marketplace/dashboard");
router.push("/profile/dashboard");
handleClose();
}}
/>

View File

@@ -77,7 +77,7 @@ To run the backend services, follow these steps:
* Within the repository, clone the submodules and navigate to the `autogpt_platform` directory:
```bash
git submodule update --init --recursive
git submodule update --init --recursive --progress
cd autogpt_platform
```
This command will initialize and update the submodules in the repository. The `supabase` folder will be cloned to the root directory.