diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts index 14cc81248..5a2bb9f34 100644 --- a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -8,7 +8,9 @@ import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' -import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { clearExecutionCancellation, markExecutionCancelled } from '@/lib/execution/cancellation' +import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' import { DAGExecutor } from '@/executor/execution/executor' @@ -93,7 +95,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: // Load workflow record to get workspaceId const [workflowRecord] = await db - .select({ workspaceId: workflowTable.workspaceId }) + .select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId }) .from(workflowTable) .where(eq(workflowTable.id, workflowId)) .limit(1) @@ -103,6 +105,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } const workspaceId = workflowRecord.workspaceId + const workflowUserId = workflowRecord.userId + + // Initialize logging session for cost tracking + const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) // Load workflow state const workflowData = await loadWorkflowFromNormalizedTables(workflowId) @@ -131,6 +137,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: true ) + // Start logging session + await loggingSession.safeStart({ + userId, + workspaceId, + variables: {}, + }) + const encoder = new TextEncoder() const abortController = new AbortController() let isStreamClosed = false @@ -191,6 +204,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: callbackData: { input?: unknown; output: NormalizedBlockOutput; executionTime: number }, iterationContext?: IterationContext ) => { + // Log to session for cost tracking + await loggingSession.onBlockComplete(blockId, blockName, blockType, callbackData) + const hasError = (callbackData.output as any)?.error if (hasError) { @@ -299,7 +315,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: sourceSnapshot as SerializableExecutionState ) + // Build trace spans from fresh execution logs only + // Trace spans show what actually executed in this run + const { traceSpans, totalDuration } = buildTraceSpans(result) + if (result.status === 'cancelled') { + await loggingSession.safeCompleteWithCancellation({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + }) + await clearExecutionCancellation(executionId) + sendEvent({ type: 'execution:cancelled', timestamp: new Date().toISOString(), @@ -312,6 +339,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: return } + // Complete logging session + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: result.output || {}, + traceSpans: traceSpans || [], + workflowInput: {}, + }) + await clearExecutionCancellation(executionId) + sendEvent({ type: 'execution:completed', timestamp: new Date().toISOString(), @@ -330,6 +367,19 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } + + // Complete logging session with error + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: executionResult?.metadata?.duration || 0, + error: { + message: errorMessage, + stackTrace: error instanceof Error ? error.stack : undefined, + }, + traceSpans, + }) + await clearExecutionCancellation(executionId) sendEvent({ type: 'execution:error', diff --git a/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx b/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx index dab65614c..73998bb2f 100644 --- a/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx +++ b/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx @@ -528,6 +528,7 @@ const TraceSpanNode = memo(function TraceSpanNode({ const isDirectError = span.status === 'error' const hasNestedError = hasErrorInTree(span) const showErrorStyle = isDirectError || hasNestedError + const isCached = span.cached === true const { icon: BlockIcon, bgColor } = getBlockIconAndColor(span.type, span.name) @@ -586,7 +587,7 @@ const TraceSpanNode = memo(function TraceSpanNode({ isIterationType(lowerType) || lowerType === 'workflow' || lowerType === 'workflow_input' return ( -