fix(server): Always JSON-encode AgentNodeExecutionInputOutput data (#8010)

- Handle JSON-encoding inside `.data.execution.upsert_execution_output(..)` to ensure it is always encoded the same
- Amend `.executor.manager.execute_node(..)` to pass unencoded data into `upsert_execution_output(..)`
This commit is contained in:
Reinier van der Leer
2024-09-06 16:58:04 +02:00
committed by GitHub
parent 3bd8040d6a
commit 1b9adf5434
2 changed files with 5 additions and 6 deletions

View File

@@ -245,7 +245,7 @@ async def upsert_execution_input(
async def upsert_execution_output(
node_exec_id: str,
output_name: str,
output_data: str, # JSON serialized data.
output_data: Any,
) -> None:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Output.
@@ -253,7 +253,7 @@ async def upsert_execution_output(
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": output_name,
"data": output_data,
"data": json.dumps(output_data),
"referencedByOutputExecId": node_exec_id,
}
)

View File

@@ -134,13 +134,12 @@ def execute_node(
output_size = 0
try:
for output_name, output_data in node_block.execute(input_data):
output_data_str = json.dumps(output_data)
output_size += len(output_data_str)
output_size += len(json.dumps(output_data))
logger.info(
"Node produced output",
extra={"json_fields": {**log_metadata, output_name: output_data_str}},
extra={"json_fields": {**log_metadata, output_name: output_data}},
)
wait(upsert_execution_output(node_exec_id, output_name, output_data_str))
wait(upsert_execution_output(node_exec_id, output_name, output_data))
for execution in _enqueue_next_nodes(
api_client=api_client,