diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 800f37f8a..ff40b6408 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -352,26 +352,13 @@ const WorkflowContent = React.memo(() => { // Remove parent-child relationship while preserving absolute position updateNodeParent(blockId, null) - // Clean up any edges that now cross container boundaries for this block - const rfNodes = getNodes() - const sourceOrTargetEdges = edgesForDisplay.filter( + // Remove all edges connected to this block + const connectedEdges = edgesForDisplay.filter( (e) => e.source === blockId || e.target === blockId ) - sourceOrTargetEdges.forEach((edge) => { - const sourceNode = rfNodes.find((n) => n.id === edge.source) - const targetNode = rfNodes.find((n) => n.id === edge.target) - const sourceParent = sourceNode?.parentId - const targetParent = targetNode?.parentId - - const crossesBoundary = - (sourceParent && !targetParent) || - (!sourceParent && targetParent) || - (sourceParent && targetParent && sourceParent !== targetParent) - - if (crossesBoundary) { - removeEdge(edge.id) - } + connectedEdges.forEach((edge) => { + removeEdge(edge.id) }) } catch (err) { logger.error('Failed to remove from subflow', { err }) diff --git a/apps/sim/db/migrations/0076_damp_vector.sql b/apps/sim/db/migrations/0076_damp_vector.sql index 9b64ede27..3ca2471e0 100644 --- a/apps/sim/db/migrations/0076_damp_vector.sql +++ b/apps/sim/db/migrations/0076_damp_vector.sql @@ -19,103 +19,123 @@ ALTER TABLE "workflow_execution_logs" ADD COLUMN IF NOT EXISTS "execution_data" jsonb NOT NULL DEFAULT '{}'::jsonb, ADD COLUMN IF NOT EXISTS "cost" jsonb;--> statement-breakpoint --- 2) Backfill top-level cost from legacy numeric columns, tokenBreakdown/models, and traceSpans aggregates -WITH RECURSIVE spans AS ( - SELECT l.id, s.span - FROM workflow_execution_logs l - LEFT JOIN LATERAL jsonb_array_elements( - COALESCE( - CASE - WHEN jsonb_typeof(l.execution_data->'traceSpans') = 'array' THEN l.execution_data->'traceSpans' - ELSE '[]'::jsonb - END - ) - ) s(span) ON true - UNION ALL - SELECT spans.id, c.span - FROM spans - JOIN LATERAL jsonb_array_elements(COALESCE(spans.span->'children','[]'::jsonb)) c(span) ON true -), -agg AS ( - SELECT id, - SUM(COALESCE((span->'cost'->>'input')::numeric,0)) AS agg_input, - SUM(COALESCE((span->'cost'->>'output')::numeric,0)) AS agg_output, - SUM(COALESCE((span->'cost'->>'total')::numeric,0)) AS agg_total, - SUM(COALESCE((span->'cost'->'tokens'->>'prompt')::numeric, COALESCE((span->'tokens'->>'prompt')::numeric,0))) AS agg_tokens_prompt, - SUM(COALESCE((span->'cost'->'tokens'->>'completion')::numeric, COALESCE((span->'tokens'->>'completion')::numeric,0))) AS agg_tokens_completion, - SUM(COALESCE((span->'cost'->'tokens'->>'total')::numeric, COALESCE((span->'tokens'->>'total')::numeric,0))) AS agg_tokens_total - FROM spans - GROUP BY id -), -model_rows AS ( - SELECT id, - (span->'cost'->>'model') AS model, - COALESCE((span->'cost'->>'input')::numeric,0) AS input, - COALESCE((span->'cost'->>'output')::numeric,0) AS output, - COALESCE((span->'cost'->>'total')::numeric,0) AS total, - COALESCE((span->'cost'->'tokens'->>'prompt')::numeric,0) AS tokens_prompt, - COALESCE((span->'cost'->'tokens'->>'completion')::numeric,0) AS tokens_completion, - COALESCE((span->'cost'->'tokens'->>'total')::numeric,0) AS tokens_total - FROM spans - WHERE span ? 'cost' AND (span->'cost'->>'model') IS NOT NULL -), -model_sums AS ( - SELECT id, - model, - SUM(input) AS input, - SUM(output) AS output, - SUM(total) AS total, - SUM(tokens_prompt) AS tokens_prompt, - SUM(tokens_completion) AS tokens_completion, - SUM(tokens_total) AS tokens_total - FROM model_rows - GROUP BY id, model -), -models AS ( - SELECT id, - jsonb_object_agg(model, jsonb_build_object( - 'input', input, - 'output', output, - 'total', total, - 'tokens', jsonb_build_object( - 'prompt', tokens_prompt, - 'completion', tokens_completion, - 'total', tokens_total - ) - )) AS models - FROM model_sums - GROUP BY id -), -tb AS ( - SELECT l.id, - NULLIF((l.execution_data->'tokenBreakdown'->>'prompt')::numeric, 0) AS prompt, - NULLIF((l.execution_data->'tokenBreakdown'->>'completion')::numeric, 0) AS completion - FROM workflow_execution_logs l -) -UPDATE workflow_execution_logs AS l -SET cost = jsonb_strip_nulls( - jsonb_build_object( - 'total', COALESCE(l.total_cost, NULLIF(agg.agg_total,0)), - 'input', COALESCE(l.total_input_cost, NULLIF(agg.agg_input,0)), - 'output', COALESCE(l.total_output_cost, NULLIF(agg.agg_output,0)), - 'tokens', CASE - WHEN l.total_tokens IS NOT NULL OR tb.prompt IS NOT NULL OR tb.completion IS NOT NULL OR NULLIF(agg.agg_tokens_total,0) IS NOT NULL THEN - jsonb_strip_nulls( - jsonb_build_object( - 'total', COALESCE(l.total_tokens, NULLIF(agg.agg_tokens_total,0)), - 'prompt', COALESCE(tb.prompt, NULLIF(agg.agg_tokens_prompt,0)), - 'completion', COALESCE(tb.completion, NULLIF(agg.agg_tokens_completion,0)) - ) +-- Process the backfill in batches to avoid large temporary files on big datasets +DO $$ +DECLARE + v_batch_size integer := 5000; -- tune if needed based on dataset size + v_rows_updated integer := 0; +BEGIN + LOOP + WITH candidate AS ( + SELECT id + FROM workflow_execution_logs + WHERE cost IS NULL + ORDER BY id + LIMIT v_batch_size + ), + spans AS ( + SELECT l.id, s.span + FROM workflow_execution_logs l + JOIN candidate c ON c.id = l.id + LEFT JOIN LATERAL jsonb_array_elements( + COALESCE( + CASE + WHEN jsonb_typeof(l.execution_data->'traceSpans') = 'array' THEN l.execution_data->'traceSpans' + ELSE '[]'::jsonb + END ) - ELSE NULL - END, - 'models', models.models - ) -) -FROM agg -LEFT JOIN models ON models.id = agg.id -LEFT JOIN tb ON tb.id = agg.id -WHERE l.id = agg.id;--> statement-breakpoint + ) s(span) ON true + UNION ALL + SELECT spans.id, c.span + FROM spans + JOIN LATERAL jsonb_array_elements(COALESCE(spans.span->'children','[]'::jsonb)) c(span) ON true + ), + agg AS ( + SELECT id, + SUM(COALESCE((span->'cost'->>'input')::numeric,0)) AS agg_input, + SUM(COALESCE((span->'cost'->>'output')::numeric,0)) AS agg_output, + SUM(COALESCE((span->'cost'->>'total')::numeric,0)) AS agg_total, + SUM(COALESCE((span->'cost'->'tokens'->>'prompt')::numeric, COALESCE((span->'tokens'->>'prompt')::numeric,0))) AS agg_tokens_prompt, + SUM(COALESCE((span->'cost'->'tokens'->>'completion')::numeric, COALESCE((span->'tokens'->>'completion')::numeric,0))) AS agg_tokens_completion, + SUM(COALESCE((span->'cost'->'tokens'->>'total')::numeric, COALESCE((span->'tokens'->>'total')::numeric,0))) AS agg_tokens_total + FROM spans + GROUP BY id + ), + model_rows AS ( + SELECT id, + (span->'cost'->>'model') AS model, + COALESCE((span->'cost'->>'input')::numeric,0) AS input, + COALESCE((span->'cost'->>'output')::numeric,0) AS output, + COALESCE((span->'cost'->>'total')::numeric,0) AS total, + COALESCE((span->'cost'->'tokens'->>'prompt')::numeric,0) AS tokens_prompt, + COALESCE((span->'cost'->'tokens'->>'completion')::numeric,0) AS tokens_completion, + COALESCE((span->'cost'->'tokens'->>'total')::numeric,0) AS tokens_total + FROM spans + WHERE span ? 'cost' AND (span->'cost'->>'model') IS NOT NULL + ), + model_sums AS ( + SELECT id, + model, + SUM(input) AS input, + SUM(output) AS output, + SUM(total) AS total, + SUM(tokens_prompt) AS tokens_prompt, + SUM(tokens_completion) AS tokens_completion, + SUM(tokens_total) AS tokens_total + FROM model_rows + GROUP BY id, model + ), + models AS ( + SELECT id, + jsonb_object_agg(model, jsonb_build_object( + 'input', input, + 'output', output, + 'total', total, + 'tokens', jsonb_build_object( + 'prompt', tokens_prompt, + 'completion', tokens_completion, + 'total', tokens_total + ) + )) AS models + FROM model_sums + GROUP BY id + ), + tb AS ( + SELECT l.id, + NULLIF((l.execution_data->'tokenBreakdown'->>'prompt')::numeric, 0) AS prompt, + NULLIF((l.execution_data->'tokenBreakdown'->>'completion')::numeric, 0) AS completion + FROM workflow_execution_logs l + JOIN candidate c ON c.id = l.id + ) + UPDATE workflow_execution_logs AS l + SET cost = jsonb_strip_nulls( + jsonb_build_object( + 'total', COALESCE(l.total_cost, NULLIF(agg.agg_total,0)), + 'input', COALESCE(l.total_input_cost, NULLIF(agg.agg_input,0)), + 'output', COALESCE(l.total_output_cost, NULLIF(agg.agg_output,0)), + 'tokens', CASE + WHEN l.total_tokens IS NOT NULL OR tb.prompt IS NOT NULL OR tb.completion IS NOT NULL OR NULLIF(agg.agg_tokens_total,0) IS NOT NULL THEN + jsonb_strip_nulls( + jsonb_build_object( + 'total', COALESCE(l.total_tokens, NULLIF(agg.agg_tokens_total,0)), + 'prompt', COALESCE(tb.prompt, NULLIF(agg.agg_tokens_prompt,0)), + 'completion', COALESCE(tb.completion, NULLIF(agg.agg_tokens_completion,0)) + ) + ) + ELSE NULL + END, + 'models', models.models + ) + ) + FROM agg + LEFT JOIN models ON models.id = agg.id + LEFT JOIN tb ON tb.id = agg.id + WHERE l.id = agg.id; + + GET DIAGNOSTICS v_rows_updated = ROW_COUNT; + EXIT WHEN v_rows_updated = 0; -- no more rows to backfill + END LOOP; +END $$;--> statement-breakpoint -- 3) Drop legacy columns now that backfill is complete ALTER TABLE "workflow_execution_logs"