diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index ada7b7c5b..1b9fa2f7f 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -1,9 +1,12 @@ +import { tasks } from '@trigger.dev/sdk' import { type NextRequest, NextResponse } from 'next/server' import { validate as uuidValidate, v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' +import { env, isTruthy } from '@/lib/core/config/env' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { getBaseUrl } from '@/lib/core/utils/urls' import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' @@ -17,6 +20,7 @@ import { } from '@/lib/workflows/persistence/utils' import { createStreamingResponse } from '@/lib/workflows/streaming/streaming' import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils' +import type { WorkflowExecutionPayload } from '@/background/workflow-execution' import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' import type { StreamingExecution } from '@/executor/types' import { Serializer } from '@/serializer' @@ -217,6 +221,64 @@ function resolveOutputIds( }) } +type AsyncExecutionParams = { + requestId: string + workflowId: string + userId: string + input: any + triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat' +} + +/** + * Handles async workflow execution by queueing a background job. + * Returns immediately with a 202 Accepted response containing the job ID. + */ +async function handleAsyncExecution(params: AsyncExecutionParams): Promise { + const { requestId, workflowId, userId, input, triggerType } = params + const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED) + + if (!useTrigger) { + logger.warn(`[${requestId}] Async mode requested but TRIGGER_DEV_ENABLED is false`) + return NextResponse.json( + { error: 'Async execution is not enabled. Set TRIGGER_DEV_ENABLED=true to use async mode.' }, + { status: 400 } + ) + } + + const payload: WorkflowExecutionPayload = { + workflowId, + userId, + input, + triggerType, + } + + try { + const handle = await tasks.trigger('workflow-execution', payload) + + logger.info(`[${requestId}] Queued async workflow execution`, { + workflowId, + jobId: handle.id, + }) + + return NextResponse.json( + { + success: true, + async: true, + jobId: handle.id, + message: 'Workflow execution queued', + statusUrl: `${getBaseUrl()}/api/jobs/${handle.id}`, + }, + { status: 202 } + ) + } catch (error: any) { + logger.error(`[${requestId}] Failed to queue async execution`, error) + return NextResponse.json( + { error: `Failed to queue async execution: ${error.message}` }, + { status: 500 } + ) + } +} + /** * POST /api/workflows/[id]/execute * @@ -291,6 +353,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const streamHeader = req.headers.get('X-Stream-Response') === 'true' const enableSSE = streamHeader || streamParam === true + const executionModeHeader = req.headers.get('X-Execution-Mode') + const isAsyncMode = executionModeHeader === 'async' logger.info(`[${requestId}] Starting server-side execution`, { workflowId, @@ -301,6 +365,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: streamParam, streamHeader, enableSSE, + isAsyncMode, }) const executionId = uuidv4() @@ -349,6 +414,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workspaceId: workflow.workspaceId, }) + if (isAsyncMode) { + return handleAsyncExecution({ + requestId, + workflowId, + userId: actorUserId, + input, + triggerType: loggingTriggerType, + }) + } + let cachedWorkflowData: { blocks: Record edges: any[] diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index 34a352e8e..d51e2f290 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -9,6 +9,7 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { @@ -22,6 +23,7 @@ import { getSubBlockValue, } from '@/lib/workflows/schedules/utils' import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionResult } from '@/executor/types' import { mergeSubblockState } from '@/stores/workflows/server-utils' const logger = createLogger('TriggerScheduleExecution') @@ -293,6 +295,9 @@ async function runWorkflowExecution({ ) try { + const executionResult = (earlyError as any)?.executionResult as ExecutionResult | undefined + const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } + await loggingSession.safeCompleteWithError({ error: { message: `Schedule execution failed: ${ @@ -300,7 +305,7 @@ async function runWorkflowExecution({ }`, stackTrace: earlyError instanceof Error ? earlyError.stack : undefined, }, - traceSpans: [], + traceSpans, }) } catch (loggingError) { logger.error(`[${requestId}] Failed to complete log entry for schedule failure`, loggingError) diff --git a/apps/sim/lib/logs/execution/logging-factory.ts b/apps/sim/lib/logs/execution/logging-factory.ts index d70b750ff..c09f4b2ec 100644 --- a/apps/sim/lib/logs/execution/logging-factory.ts +++ b/apps/sim/lib/logs/execution/logging-factory.ts @@ -118,11 +118,9 @@ export function calculateCostSummary(traceSpans: any[]): { totalCost += span.cost.total || 0 totalInputCost += span.cost.input || 0 totalOutputCost += span.cost.output || 0 - const promptTokens = span.tokens?.prompt ?? span.tokens?.input ?? 0 - const completionTokens = span.tokens?.completion ?? span.tokens?.output ?? 0 totalTokens += span.tokens?.total || 0 - totalPromptTokens += promptTokens - totalCompletionTokens += completionTokens + totalPromptTokens += span.tokens?.input ?? span.tokens?.prompt ?? 0 + totalCompletionTokens += span.tokens?.output ?? span.tokens?.completion ?? 0 if (span.model) { const model = span.model @@ -137,8 +135,8 @@ export function calculateCostSummary(traceSpans: any[]): { models[model].input += span.cost.input || 0 models[model].output += span.cost.output || 0 models[model].total += span.cost.total || 0 - models[model].tokens.prompt += promptTokens - models[model].tokens.completion += completionTokens + models[model].tokens.prompt += span.tokens?.input ?? span.tokens?.prompt ?? 0 + models[model].tokens.completion += span.tokens?.output ?? span.tokens?.completion ?? 0 models[model].tokens.total += span.tokens?.total || 0 } } diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index d19904e5d..a14dedfca 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -186,22 +186,24 @@ export class LoggingSession { const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0 const startTime = new Date(endTime.getTime() - Math.max(1, durationMs)) - const costSummary = { - totalCost: BASE_EXECUTION_CHARGE, - totalInputCost: 0, - totalOutputCost: 0, - totalTokens: 0, - totalPromptTokens: 0, - totalCompletionTokens: 0, - baseExecutionCharge: BASE_EXECUTION_CHARGE, - modelCost: 0, - models: {}, - } + const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0 + + const costSummary = hasProvidedSpans + ? calculateCostSummary(traceSpans) + : { + totalCost: BASE_EXECUTION_CHARGE, + totalInputCost: 0, + totalOutputCost: 0, + totalTokens: 0, + totalPromptTokens: 0, + totalCompletionTokens: 0, + baseExecutionCharge: BASE_EXECUTION_CHARGE, + modelCost: 0, + models: {}, + } const message = error?.message || 'Execution failed before starting blocks' - const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0 - const errorSpan: TraceSpan = { id: 'workflow-error-root', name: 'Workflow Error',