mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(backend): Migrate json encoded string columns into a native json column (#9475)
### Changes 🏗️ Due to the legacy of SQLite usage, some of the JSON columns are actually a string column string a stringified JSON column. The scope of this PR is migrating those columns into an actual JSON column. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [ ] ... <details> <summary>Example test plan</summary> - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly </details> #### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**) <details> <summary>Examples of configuration changes</summary> - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases </details>
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from multiprocessing import Manager
|
||||
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
|
||||
from typing import Any, AsyncGenerator, Generator, Generic, Optional, Type, TypeVar
|
||||
|
||||
from prisma import Json
|
||||
from prisma.enums import AgentExecutionStatus
|
||||
from prisma.errors import PrismaError
|
||||
from prisma.models import (
|
||||
@@ -16,7 +17,7 @@ from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
|
||||
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
|
||||
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
|
||||
from backend.server.v2.store.exceptions import DatabaseError
|
||||
from backend.util import json, mock
|
||||
from backend.util import mock, type
|
||||
from backend.util.settings import Config
|
||||
|
||||
|
||||
@@ -102,16 +103,16 @@ class ExecutionResult(BaseModel):
|
||||
def from_db(execution: AgentNodeExecution):
|
||||
if execution.executionData:
|
||||
# Execution that has been queued for execution will persist its data.
|
||||
input_data = json.loads(execution.executionData, target_type=dict[str, Any])
|
||||
input_data = type.convert(execution.executionData, dict[str, Any])
|
||||
else:
|
||||
# For incomplete execution, executionData will not be yet available.
|
||||
input_data: BlockInput = defaultdict()
|
||||
for data in execution.Input or []:
|
||||
input_data[data.name] = json.loads(data.data)
|
||||
input_data[data.name] = type.convert(data.data, Type[Any])
|
||||
|
||||
output_data: CompletedBlockOutput = defaultdict(list)
|
||||
for data in execution.Output or []:
|
||||
output_data[data.name].append(json.loads(data.data))
|
||||
output_data[data.name].append(type.convert(data.data, Type[Any]))
|
||||
|
||||
graph_execution: AgentGraphExecution | None = execution.AgentGraphExecution
|
||||
|
||||
@@ -158,7 +159,7 @@ async def create_graph_execution(
|
||||
"executionStatus": ExecutionStatus.INCOMPLETE,
|
||||
"Input": {
|
||||
"create": [
|
||||
{"name": name, "data": json.dumps(data)}
|
||||
{"name": name, "data": Json(data)}
|
||||
for name, data in node_input.items()
|
||||
]
|
||||
},
|
||||
@@ -210,7 +211,7 @@ async def upsert_execution_input(
|
||||
order={"addedTime": "asc"},
|
||||
include={"Input": True},
|
||||
)
|
||||
json_input_data = json.dumps(input_data)
|
||||
json_input_data = Json(input_data)
|
||||
|
||||
if existing_execution:
|
||||
await AgentNodeExecutionInputOutput.prisma().create(
|
||||
@@ -222,7 +223,7 @@ async def upsert_execution_input(
|
||||
)
|
||||
return existing_execution.id, {
|
||||
**{
|
||||
input_data.name: json.loads(input_data.data)
|
||||
input_data.name: type.convert(input_data.data, Type[Any])
|
||||
for input_data in existing_execution.Input or []
|
||||
},
|
||||
input_name: input_data,
|
||||
@@ -256,7 +257,7 @@ async def upsert_execution_output(
|
||||
await AgentNodeExecutionInputOutput.prisma().create(
|
||||
data={
|
||||
"name": output_name,
|
||||
"data": json.dumps(output_data),
|
||||
"data": Json(output_data),
|
||||
"referencedByOutputExecId": node_exec_id,
|
||||
}
|
||||
)
|
||||
@@ -281,7 +282,7 @@ async def update_graph_execution_stats(
|
||||
where={"id": graph_exec_id},
|
||||
data={
|
||||
"executionStatus": status,
|
||||
"stats": json.dumps(stats),
|
||||
"stats": Json(stats),
|
||||
},
|
||||
)
|
||||
if not res:
|
||||
@@ -293,7 +294,7 @@ async def update_graph_execution_stats(
|
||||
async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
|
||||
await AgentNodeExecution.prisma().update(
|
||||
where={"id": node_exec_id},
|
||||
data={"stats": json.dumps(stats)},
|
||||
data={"stats": Json(stats)},
|
||||
)
|
||||
|
||||
|
||||
@@ -313,8 +314,8 @@ async def update_execution_status(
|
||||
**({"startedTime": now} if status == ExecutionStatus.RUNNING else {}),
|
||||
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
|
||||
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
|
||||
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
|
||||
**({"stats": json.dumps(stats)} if stats else {}),
|
||||
**({"executionData": Json(execution_data)} if execution_data else {}),
|
||||
**({"stats": Json(stats)} if stats else {}),
|
||||
}
|
||||
|
||||
res = await AgentNodeExecution.prisma().update(
|
||||
@@ -473,8 +474,7 @@ async def get_latest_execution(node_id: str, graph_eid: str) -> ExecutionResult
|
||||
where={
|
||||
"agentNodeId": node_id,
|
||||
"agentGraphExecutionId": graph_eid,
|
||||
"executionStatus": {"not": ExecutionStatus.INCOMPLETE},
|
||||
"executionData": {"not": None}, # type: ignore
|
||||
"executionStatus": {"not": ExecutionStatus.INCOMPLETE}, # type: ignore
|
||||
},
|
||||
order={"queuedTime": "desc"},
|
||||
include=EXECUTION_RESULT_INCLUDE,
|
||||
|
||||
@@ -6,6 +6,7 @@ from datetime import datetime, timezone
|
||||
from typing import Any, Literal, Optional, Type
|
||||
|
||||
import prisma
|
||||
from prisma import Json
|
||||
from prisma.models import (
|
||||
AgentGraph,
|
||||
AgentGraphExecution,
|
||||
@@ -18,7 +19,7 @@ from pydantic.fields import computed_field
|
||||
|
||||
from backend.blocks.agent import AgentExecutorBlock
|
||||
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
|
||||
from backend.util import json
|
||||
from backend.util import type
|
||||
|
||||
from .block import BlockInput, BlockType, get_block, get_blocks
|
||||
from .db import BaseDbModel, transaction
|
||||
@@ -74,8 +75,8 @@ class NodeModel(Node):
|
||||
obj = NodeModel(
|
||||
id=node.id,
|
||||
block_id=node.AgentBlock.id,
|
||||
input_default=json.loads(node.constantInput, target_type=dict[str, Any]),
|
||||
metadata=json.loads(node.metadata, target_type=dict[str, Any]),
|
||||
input_default=type.convert(node.constantInput, dict[str, Any]),
|
||||
metadata=type.convert(node.metadata, dict[str, Any]),
|
||||
graph_id=node.agentGraphId,
|
||||
graph_version=node.agentGraphVersion,
|
||||
webhook_id=node.webhookId,
|
||||
@@ -125,7 +126,7 @@ class GraphExecution(BaseDbModel):
|
||||
total_run_time = duration
|
||||
|
||||
try:
|
||||
stats = json.loads(execution.stats or "{}", target_type=dict[str, Any])
|
||||
stats = type.convert(execution.stats or {}, dict[str, Any])
|
||||
except ValueError:
|
||||
stats = {}
|
||||
|
||||
@@ -402,11 +403,9 @@ class GraphModel(Graph):
|
||||
if for_export:
|
||||
# Remove credentials from node input
|
||||
if node.constantInput:
|
||||
constant_input = json.loads(
|
||||
node.constantInput, target_type=dict[str, Any]
|
||||
)
|
||||
constant_input = type.convert(node.constantInput, dict[str, Any])
|
||||
constant_input = GraphModel._hide_node_input_credentials(constant_input)
|
||||
node.constantInput = json.dumps(constant_input)
|
||||
node.constantInput = Json(constant_input)
|
||||
|
||||
# Remove webhook info
|
||||
node.webhookId = None
|
||||
@@ -654,8 +653,8 @@ async def __create_graph(tx, graph: Graph, user_id: str):
|
||||
{
|
||||
"id": node.id,
|
||||
"agentBlockId": node.block_id,
|
||||
"constantInput": json.dumps(node.input_default),
|
||||
"metadata": json.dumps(node.metadata),
|
||||
"constantInput": Json(node.input_default),
|
||||
"metadata": Json(node.metadata),
|
||||
}
|
||||
for node in graph.nodes
|
||||
]
|
||||
@@ -742,7 +741,7 @@ async def fix_llm_provider_credentials():
|
||||
raise RuntimeError(f"Impossible state while processing node {node}")
|
||||
|
||||
node_id: str = node["node_id"]
|
||||
node_preset_input: dict = json.loads(node["node_preset_input"])
|
||||
node_preset_input: dict = node["node_preset_input"]
|
||||
credentials_meta: dict = node_preset_input["credentials"]
|
||||
|
||||
credentials = next(
|
||||
@@ -778,5 +777,5 @@ async def fix_llm_provider_credentials():
|
||||
store.update_creds(user_id, credentials)
|
||||
await AgentNode.prisma().update(
|
||||
where={"id": node_id},
|
||||
data={"constantInput": json.dumps(node_preset_input)},
|
||||
data={"constantInput": Json(node_preset_input)},
|
||||
)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import json
|
||||
from typing import Any, Type, TypeVar, cast, get_args, get_origin
|
||||
|
||||
from prisma import Json as PrismaJson
|
||||
|
||||
|
||||
class ConversionError(ValueError):
|
||||
pass
|
||||
@@ -188,6 +190,8 @@ def type_match(value: Any, target_type: Type[T]) -> T:
|
||||
|
||||
def convert(value: Any, target_type: Type[T]) -> T:
|
||||
try:
|
||||
if isinstance(value, PrismaJson):
|
||||
value = value.data
|
||||
return cast(T, _try_convert(value, target_type, raise_on_mismatch=False))
|
||||
except Exception as e:
|
||||
raise ConversionError(f"Failed to convert {value} to {target_type}") from e
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
CREATE OR REPLACE FUNCTION migrate_text_column_to_json(
|
||||
p_table text, -- Table name, e.g. 'AgentNodeExecution'
|
||||
p_col text, -- Column name to convert, e.g. 'executionData'
|
||||
p_default json DEFAULT '{}'::json, -- Fallback value when original value is NULL.
|
||||
-- Pass NULL here if you prefer to leave NULLs.
|
||||
p_set_nullable boolean DEFAULT true -- If false, the new column will be NOT NULL.
|
||||
) RETURNS void AS $$
|
||||
DECLARE
|
||||
full_table text;
|
||||
tmp_col text;
|
||||
BEGIN
|
||||
-- Build a fully qualified table name using the current schema.
|
||||
full_table := format('%I.%I', current_schema(), p_table);
|
||||
tmp_col := p_col || '_tmp';
|
||||
|
||||
-- 0. Skip the migration if the column is already of type jsonb.
|
||||
IF EXISTS (
|
||||
SELECT 1
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = current_schema()
|
||||
AND table_name = p_table
|
||||
AND column_name = p_col
|
||||
AND data_type = 'jsonb'
|
||||
) THEN
|
||||
RAISE NOTICE 'Column %I.%I is already of type jsonb, skipping migration.', full_table, p_col;
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
-- 1. Cleanup the original column from invalid JSON characters.
|
||||
EXECUTE format('UPDATE %s SET %I = replace(%I, E''\\u0000'', '''') WHERE %I LIKE ''%%\\u0000%%'';', full_table, p_col, p_col, p_col);
|
||||
|
||||
-- 2. Add the temporary column of type JSON.
|
||||
EXECUTE format('ALTER TABLE %s ADD COLUMN %I jsonb;', full_table, tmp_col);
|
||||
|
||||
-- 3. Convert the data:
|
||||
-- - If p_default IS NOT NULL, use it as the fallback value.
|
||||
-- - Otherwise, keep NULL.
|
||||
IF p_default IS NULL THEN
|
||||
EXECUTE format(
|
||||
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN NULL ELSE %I::json END;',
|
||||
full_table, tmp_col, p_col, p_col
|
||||
);
|
||||
ELSE
|
||||
EXECUTE format(
|
||||
'UPDATE %s SET %I = CASE WHEN %I IS NULL THEN %L::json ELSE %I::json END;',
|
||||
full_table, tmp_col, p_col, p_default::text, p_col
|
||||
);
|
||||
END IF;
|
||||
|
||||
-- 4. Drop the original text column.
|
||||
EXECUTE format('ALTER TABLE %s DROP COLUMN %I;', full_table, p_col);
|
||||
|
||||
-- 5. Rename the temporary column to the original column name.
|
||||
EXECUTE format('ALTER TABLE %s RENAME COLUMN %I TO %I;', full_table, tmp_col, p_col);
|
||||
|
||||
-- 6. Optionally set a DEFAULT for future inserts if a fallback is provided.
|
||||
IF p_default IS NOT NULL THEN
|
||||
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET DEFAULT %L::json;',
|
||||
full_table, p_col, p_default::text);
|
||||
END IF;
|
||||
|
||||
-- 7. Optionally mark the column as NOT NULL.
|
||||
IF NOT p_set_nullable THEN
|
||||
EXECUTE format('ALTER TABLE %s ALTER COLUMN %I SET NOT NULL;', full_table, p_col);
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
||||
BEGIN;
|
||||
SELECT migrate_text_column_to_json('AgentGraphExecution', 'stats', NULL, true);
|
||||
SELECT migrate_text_column_to_json('AgentNodeExecution', 'stats', NULL, true);
|
||||
SELECT migrate_text_column_to_json('AgentNodeExecution', 'executionData', NULL, true);
|
||||
SELECT migrate_text_column_to_json('AgentNode', 'constantInput', '{}'::json, false);
|
||||
SELECT migrate_text_column_to_json('AgentNode', 'metadata', '{}'::json, false);
|
||||
SELECT migrate_text_column_to_json('AgentNodeExecutionInputOutput', 'data', NULL, false);
|
||||
COMMIT;
|
||||
@@ -209,15 +209,13 @@ model AgentNode {
|
||||
// List of produced output, that the child node should be executed.
|
||||
Output AgentNodeLink[] @relation("AgentNodeSource")
|
||||
|
||||
// JSON serialized dict[str, str] containing predefined input values.
|
||||
constantInput String @default("{}")
|
||||
constantInput Json @default("{}")
|
||||
|
||||
// For webhook-triggered blocks: reference to the webhook that triggers the node
|
||||
webhookId String?
|
||||
Webhook IntegrationWebhook? @relation(fields: [webhookId], references: [id])
|
||||
|
||||
// JSON serialized dict[str, str] containing the node metadata.
|
||||
metadata String @default("{}")
|
||||
metadata Json @default("{}")
|
||||
|
||||
ExecutionHistory AgentNodeExecution[]
|
||||
|
||||
@@ -290,7 +288,7 @@ model AgentGraphExecution {
|
||||
userId String
|
||||
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||
|
||||
stats String? // JSON serialized object
|
||||
stats Json?
|
||||
AgentPreset AgentPreset? @relation(fields: [agentPresetId], references: [id])
|
||||
agentPresetId String?
|
||||
|
||||
@@ -312,14 +310,13 @@ model AgentNodeExecution {
|
||||
Output AgentNodeExecutionInputOutput[] @relation("AgentNodeExecutionOutput")
|
||||
|
||||
executionStatus AgentExecutionStatus @default(COMPLETED)
|
||||
// Final JSON serialized input data for the node execution.
|
||||
executionData String?
|
||||
executionData Json?
|
||||
addedTime DateTime @default(now())
|
||||
queuedTime DateTime?
|
||||
startedTime DateTime?
|
||||
endedTime DateTime?
|
||||
|
||||
stats String? // JSON serialized object
|
||||
stats Json?
|
||||
|
||||
@@index([agentGraphExecutionId])
|
||||
@@index([agentNodeId])
|
||||
@@ -330,7 +327,7 @@ model AgentNodeExecutionInputOutput {
|
||||
id String @id @default(uuid())
|
||||
|
||||
name String
|
||||
data String
|
||||
data Json
|
||||
time DateTime @default(now())
|
||||
|
||||
// Prisma requires explicit back-references.
|
||||
|
||||
@@ -4,7 +4,7 @@ from datetime import datetime
|
||||
|
||||
import prisma.enums
|
||||
from faker import Faker
|
||||
from prisma import Prisma
|
||||
from prisma import Json, Prisma
|
||||
|
||||
faker = Faker()
|
||||
|
||||
@@ -110,8 +110,8 @@ async def main():
|
||||
"agentBlockId": block.id,
|
||||
"agentGraphId": graph.id,
|
||||
"agentGraphVersion": graph.version,
|
||||
"constantInput": "{}",
|
||||
"metadata": "{}",
|
||||
"constantInput": Json({}),
|
||||
"metadata": Json({}),
|
||||
}
|
||||
)
|
||||
agent_nodes.append(node)
|
||||
|
||||
Reference in New Issue
Block a user