Merge branch 'ntindle/secrt-1079-add-ability-to-send-emails-from-notification-service' into ntindle/secrt-1077-add-email-service

This commit is contained in:
Nicholas Tindle
2025-02-13 07:37:17 -06:00
committed by GitHub
8 changed files with 147 additions and 47 deletions

View File

@@ -42,6 +42,14 @@ jobs:
REDIS_PASSWORD: testpassword
ports:
- 6379:6379
rabbitmq:
image: rabbitmq:3.12-management
ports:
- 5672:5672
- 15672:15672
env:
RABBITMQ_DEFAULT_USER: ${{ env.RABBITMQ_DEFAULT_USER }}
RABBITMQ_DEFAULT_PASS: ${{ env.RABBITMQ_DEFAULT_PASS }}
steps:
- name: Checkout repository
@@ -129,9 +137,9 @@ jobs:
SUPABASE_URL: ${{ steps.supabase.outputs.API_URL }}
SUPABASE_SERVICE_ROLE_KEY: ${{ steps.supabase.outputs.SERVICE_ROLE_KEY }}
SUPABASE_JWT_SECRET: ${{ steps.supabase.outputs.JWT_SECRET }}
REDIS_HOST: 'localhost'
REDIS_PORT: '6379'
REDIS_PASSWORD: 'testpassword'
REDIS_HOST: "localhost"
REDIS_PORT: "6379"
REDIS_PASSWORD: "testpassword"
env:
CI: true
@@ -139,6 +147,13 @@ jobs:
RUN_ENV: local
PORT: 8080
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
# We know these are here, don't report this as a security vulnerability
# This is used as the default credential for the entire system's RabbitMQ instance
# If you want to replace this, you can do so by making our entire system generate
# new credentials for each local user and update the environment variables in
# the backend service, docker composes, and examples
RABBITMQ_DEFAULT_USER: rabbitmq_user_default
RABBITMQ_DEFAULT_PASS: k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7
# - name: Upload coverage reports to Codecov
# uses: codecov/codecov-action@v4

View File

@@ -1,8 +1,9 @@
from collections import defaultdict
from datetime import datetime, timezone
from multiprocessing import Manager
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar
from prisma import Json
from prisma.enums import AgentExecutionStatus
from prisma.errors import PrismaError
from prisma.models import (
@@ -16,7 +17,7 @@ from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.server.v2.store.exceptions import DatabaseError
from backend.util import json, mock
from backend.util import mock, type
from backend.util.settings import Config
@@ -102,16 +103,16 @@ class ExecutionResult(BaseModel):
def from_db(execution: AgentNodeExecution):
if execution.executionData:
# Execution that has been queued for execution will persist its data.
input_data = json.loads(execution.executionData, target_type=dict[str, Any])
input_data = type.convert(execution.executionData, dict[str, Any])
else:
# For incomplete execution, executionData will not be yet available.
input_data: BlockInput = defaultdict()
for data in execution.Input or []:
input_data[data.name] = json.loads(data.data)
input_data[data.name] = type.convert(data.data, Type[Any])
output_data: CompletedBlockOutput = defaultdict(list)
for data in execution.Output or []:
output_data[data.name].append(json.loads(data.data))
output_data[data.name].append(type.convert(data.data, Type[Any]))
graph_execution: AgentGraphExecution | None = execution.AgentGraphExecution
@@ -158,7 +159,7 @@ async def create_graph_execution(
"executionStatus": ExecutionStatus.INCOMPLETE,
"Input": {
"create": [
{"name": name, "data": json.dumps(data)}
{"name": name, "data": Json(data)}
for name, data in node_input.items()
]
},
@@ -210,7 +211,7 @@ async def upsert_execution_input(
order={"addedTime": "asc"},
include={"Input": True},
)
json_input_data = json.dumps(input_data)
json_input_data = Json(input_data)
if existing_execution:
await AgentNodeExecutionInputOutput.prisma().create(
@@ -222,7 +223,7 @@ async def upsert_execution_input(
)
return existing_execution.id, {
**{
input_data.name: json.loads(input_data.data)
input_data.name: type.convert(input_data.data, Type[Any])
for input_data in existing_execution.Input or []
},
input_name: input_data,
@@ -256,7 +257,7 @@ async def upsert_execution_output(
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": output_name,
"data": json.dumps(output_data),
"data": Json(output_data),
"referencedByOutputExecId": node_exec_id,
}
)
@@ -281,7 +282,7 @@ async def update_graph_execution_stats(
where={"id": graph_exec_id},
data={
"executionStatus": status,
"stats": json.dumps(stats),
"stats": Json(stats),
},
)
if not res:
@@ -293,7 +294,7 @@ async def update_graph_execution_stats(
async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
await AgentNodeExecution.prisma().update(
where={"id": node_exec_id},
data={"stats": json.dumps(stats)},
data={"stats": Json(stats)},
)
@@ -313,8 +314,8 @@ async def update_execution_status(
**({"startedTime": now} if status == ExecutionStatus.RUNNING else {}),
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
**({"stats": json.dumps(stats)} if stats else {}),
**({"executionData": Json(execution_data)} if execution_data else {}),
**({"stats": Json(stats)} if stats else {}),
}
res = await AgentNodeExecution.prisma().update(
@@ -473,8 +474,7 @@ async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult
where={
"agentNodeId": node_id,
"agentGraphExecutionId": graph_eid,
"executionStatus": {"not": ExecutionStatus.INCOMPLETE},
"executionData": {"not": None}, # type: ignore
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
},
order={"queuedTime": "desc"},
include=EXECUTION_RESULT_INCLUDE,

View File

@@ -6,6 +6,7 @@ from datetime import datetime, timezone
from typing import Any, Literal, Optional, Type
import prisma
from prisma import Json
from prisma.models import (
AgentGraph,
AgentGraphExecution,
@@ -18,7 +19,7 @@ from pydantic.fields import computed_field
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
from backend.util import json
from backend.util import type
from .block import BlockInput, BlockType, get_block, get_blocks
from .db import BaseDbModel, transaction
@@ -74,8 +75,8 @@ class NodeModel(Node):
obj = NodeModel(
id=node.id,
block_id=node.AgentBlock.id,
input_default=json.loads(node.constantInput, target_type=dict[str, Any]),
metadata=json.loads(node.metadata, target_type=dict[str, Any]),
input_default=type.convert(node.constantInput, dict[str, Any]),
metadata=type.convert(node.metadata, dict[str, Any]),
graph_id=node.agentGraphId,
graph_version=node.agentGraphVersion,
webhook_id=node.webhookId,
@@ -125,7 +126,7 @@ class GraphExecution(BaseDbModel):
total_run_time = duration
try:
stats = json.loads(execution.stats or "{}", target_type=dict[str, Any])
stats = type.convert(execution.stats or {}, dict[str, Any])
except ValueError:
stats = {}
@@ -402,11 +403,9 @@ class GraphModel(Graph):
if for_export:
# Remove credentials from node input
if node.constantInput:
constant_input = json.loads(
node.constantInput, target_type=dict[str, Any]
)
constant_input = type.convert(node.constantInput, dict[str, Any])
constant_input = GraphModel._hide_node_input_credentials(constant_input)
node.constantInput = json.dumps(constant_input)
node.constantInput = Json(constant_input)
# Remove webhook info
node.webhookId = None
@@ -654,8 +653,8 @@ async def __create_graph(tx, graph: Graph, user_id: str):
{
"id": node.id,
"agentBlockId": node.block_id,
"constantInput": json.dumps(node.input_default),
"metadata": json.dumps(node.metadata),
"constantInput": Json(node.input_default),
"metadata": Json(node.metadata),
}
for node in graph.nodes
]
@@ -717,9 +716,11 @@ async def fix_llm_provider_credentials():
store = IntegrationCredentialsStore()
broken_nodes = await prisma.get_client().query_raw(
"""
SELECT graph."userId" user_id,
broken_nodes = []
try:
broken_nodes = await prisma.get_client().query_raw(
"""
SELECT graph."userId" user_id,
node.id node_id,
node."constantInput" node_preset_input
FROM platform."AgentNode" node
@@ -728,8 +729,10 @@ async def fix_llm_provider_credentials():
WHERE node."constantInput"::jsonb->'credentials'->>'provider' = 'llm'
ORDER BY graph."userId";
"""
)
logger.info(f"Fixing LLM credential inputs on {len(broken_nodes)} nodes")
)
logger.info(f"Fixing LLM credential inputs on {len(broken_nodes)} nodes")
except Exception as e:
logger.error(f"Error fixing LLM credential inputs: {e}")
user_id: str = ""
user_integrations = None
@@ -742,7 +745,7 @@ async def fix_llm_provider_credentials():
raise RuntimeError(f"Impossible state while processing node {node}")
node_id: str = node["node_id"]
node_preset_input: dict = json.loads(node["node_preset_input"])
node_preset_input: dict = node["node_preset_input"]
credentials_meta: dict = node_preset_input["credentials"]
credentials = next(
@@ -778,5 +781,5 @@ async def fix_llm_provider_credentials():
store.update_creds(user_id, credentials)
await AgentNode.prisma().update(
where={"id": node_id},
data={"constantInput": json.dumps(node_preset_input)},
data={"constantInput": Json(node_preset_input)},
)

View File

@@ -9,6 +9,7 @@ from backend.data.execution import ExecutionResult, ExecutionStatus
from backend.data.model import _BaseCredentials
from backend.data.user import create_default_user
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.notifications.notifications import NotificationManager
from backend.server.rest_api import AgentServer
from backend.server.utils import get_user_id
@@ -21,6 +22,7 @@ class SpinTestServer:
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = ExecutionScheduler()
self.notifications = NotificationManager()
@staticmethod
def test_get_user_id():
@@ -32,6 +34,7 @@ class SpinTestServer:
self.agent_server.__enter__()
self.exec_manager.__enter__()
self.scheduler.__enter__()
self.notifications.__enter__()
await db.connect()
await initialize_blocks()
@@ -46,6 +49,7 @@ class SpinTestServer:
self.exec_manager.__exit__(exc_type, exc_val, exc_tb)
self.agent_server.__exit__(exc_type, exc_val, exc_tb)
self.db_api.__exit__(exc_type, exc_val, exc_tb)
self.notifications.__exit__(exc_type, exc_val, exc_tb)
def setup_dependency_overrides(self):
# Override get_user_id for testing

View File

@@ -1,6 +1,8 @@
import json
from typing import Any, Type, TypeVar, cast, get_args, get_origin
from prisma import Json as PrismaJson
class ConversionError(ValueError):
pass
@@ -188,6 +190,8 @@ def type_match(value: Any, target_type: Type[T]) -> T:
def convert(value: Any, target_type: Type[T]) -> T:
try:
if isinstance(value, PrismaJson):
value = value.data
return cast(T, _try_convert(value, target_type, raise_on_mismatch=False))
except Exception as e:
raise ConversionError(f"Failed to convert {value} to {target_type}") from e

View File

@@ -0,0 +1,77 @@
CREATE OR REPLACE FUNCTION migrate_text_column_to_json(
p_table text, -- Table name, e.g. 'AgentNodeExecution'
p_col text, -- Column name to convert, e.g. 'executionData'
p_default json DEFAULT '{}'::json, -- Fallback value when original value is NULL.
-- Pass NULL here if you prefer to leave NULLs.
p_set_nullable boolean DEFAULT true -- If false, the new column will be NOT NULL.
) RETURNS void AS $$
DECLARE
full_table text;
tmp_col text;
BEGIN
-- Build a fully qualified table name using the current schema.
full_table := format('%I.%I', current_schema(), p_table);
tmp_col := p_col || '_tmp';
-- 0. Skip the migration if the column is already of type jsonb.
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = p_table
AND column_name = p_col
AND data_type = 'jsonb'
) THEN
RAISE NOTICE 'Column %I.%I is already of type jsonb, skipping migration.', full_table, p_col;
RETURN;
END IF;
-- 1. Cleanup the original column from invalid JSON characters.
EXECUTE format('UPDATE %s SET %I = replace(%I, E''\\u0000'', '''') WHERE %I LIKE ''%%\\u0000%%'';', full_table, p_col, p_col, p_col);
-- 2. Add the temporary column of type JSON.
EXECUTE format('ALTER TABLE %s ADD COLUMN %I jsonb;', full_table, tmp_col);
-- 3. Convert the data:
-- - If p_default IS NOT NULL, use it as the fallback value.
-- - Otherwise, keep NULL.
IF p_default IS NULL THEN
EXECUTE format(
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN NULL ELSE %I::json END;',
full_table, tmp_col, p_col, p_col
);
ELSE
EXECUTE format(
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN %L::json ELSE %I::json END;',
full_table, tmp_col, p_col, p_default::text, p_col
);
END IF;
-- 4. Drop the original text column.
EXECUTE format('ALTER TABLE %s DROP COLUMN %I;', full_table, p_col);
-- 5. Rename the temporary column to the original column name.
EXECUTE format('ALTER TABLE %s RENAME COLUMN %I TO %I;', full_table, tmp_col, p_col);
-- 6. Optionally set a DEFAULT for future inserts if a fallback is provided.
IF p_default IS NOT NULL THEN
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET DEFAULT %L::json;',
full_table, p_col, p_default::text);
END IF;
-- 7. Optionally mark the column as NOT NULL.
IF NOT p_set_nullable THEN
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET NOT NULL;', full_table, p_col);
END IF;
END;
$$ LANGUAGE plpgsql;
BEGIN;
SELECT migrate_text_column_to_json('AgentGraphExecution', 'stats', NULL, true);
SELECT migrate_text_column_to_json('AgentNodeExecution', 'stats', NULL, true);
SELECT migrate_text_column_to_json('AgentNodeExecution', 'executionData', NULL, true);
SELECT migrate_text_column_to_json('AgentNode', 'constantInput', '{}'::json, false);
SELECT migrate_text_column_to_json('AgentNode', 'metadata', '{}'::json, false);
SELECT migrate_text_column_to_json('AgentNodeExecutionInputOutput', 'data', NULL, false);
COMMIT;

View File

@@ -209,15 +209,13 @@ model AgentNode {
// List of produced output, that the child node should be executed.
Output AgentNodeLink[] @relation("AgentNodeSource")
// JSON serialized dict[str, str] containing predefined input values.
constantInput String @default("{}")
constantInput Json @default("{}")
// For webhook-triggered blocks: reference to the webhook that triggers the node
webhookId String?
Webhook IntegrationWebhook? @relation(fields: [webhookId], references: [id])
// JSON serialized dict[str, str] containing the node metadata.
metadata String @default("{}")
metadata Json @default("{}")
ExecutionHistory AgentNodeExecution[]
@@ -290,7 +288,7 @@ model AgentGraphExecution {
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
stats String? // JSON serialized object
stats Json?
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
agentPresetId String?
@@ -312,14 +310,13 @@ model AgentNodeExecution {
Output AgentNodeExecutionInputOutput[] @relation("AgentNodeExecutionOutput")
executionStatus AgentExecutionStatus @default(COMPLETED)
// Final JSON serialized input data for the node execution.
executionData String?
executionData Json?
addedTime DateTime @default(now())
queuedTime DateTime?
startedTime DateTime?
endedTime DateTime?
stats String? // JSON serialized object
stats Json?
@@index([agentGraphExecutionId])
@@index([agentNodeId])
@@ -330,7 +327,7 @@ model AgentNodeExecutionInputOutput {
id String @id @default(uuid())
name String
data String
data Json
time DateTime @default(now())
// Prisma requires explicit back-references.

View File

@@ -4,7 +4,7 @@ from datetime import datetime
import prisma.enums
from faker import Faker
from prisma import Prisma
from prisma import Json, Prisma
faker = Faker()
@@ -110,8 +110,8 @@ async def main():
"agentBlockId": block.id,
"agentGraphId": graph.id,
"agentGraphVersion": graph.version,
"constantInput": "{}",
"metadata": "{}",
"constantInput": Json({}),
"metadata": Json({}),
}
)
agent_nodes.append(node)