Files
sim/apps/sim/background/webhook-execution.ts
Waleed 41c068c023 improvement(lib): refactored lib/ to be more aligned with queries and api directory (#2160)
* fix(lib): consolidate into core dir in lib/

* refactored lib/
2025-12-02 14:17:41 -08:00

542 lines
17 KiB
TypeScript

import { db } from '@sim/db'
import { webhook, workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
import { processExecutionFiles } from '@/lib/execution/files'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils.server'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import { getTrigger, isTriggerValid } from '@/triggers'
const logger = createLogger('TriggerWebhookExecution')
/**
* Process trigger outputs based on their schema definitions
* Finds outputs marked as 'file' or 'file[]' and uploads them to execution storage
*/
async function processTriggerFileOutputs(
input: any,
triggerOutputs: Record<string, any>,
context: {
workspaceId: string
workflowId: string
executionId: string
requestId: string
userId?: string
},
path = ''
): Promise<any> {
if (!input || typeof input !== 'object') {
return input
}
const processed: any = Array.isArray(input) ? [] : {}
for (const [key, value] of Object.entries(input)) {
const currentPath = path ? `${path}.${key}` : key
const outputDef = triggerOutputs[key]
const val: any = value
// If this field is marked as file or file[], process it
if (outputDef?.type === 'file[]' && Array.isArray(val)) {
try {
processed[key] = await WebhookAttachmentProcessor.processAttachments(val as any, context)
} catch (error) {
processed[key] = []
}
} else if (outputDef?.type === 'file' && val) {
try {
const [processedFile] = await WebhookAttachmentProcessor.processAttachments(
[val as any],
context
)
processed[key] = processedFile
} catch (error) {
logger.error(`[${context.requestId}] Error processing ${currentPath}:`, error)
processed[key] = val
}
} else if (outputDef && typeof outputDef === 'object' && !outputDef.type) {
// Nested object in schema - recurse with the nested schema
processed[key] = await processTriggerFileOutputs(val, outputDef, context, currentPath)
} else {
// Not a file output - keep as is
processed[key] = val
}
}
return processed
}
export type WebhookExecutionPayload = {
webhookId: string
workflowId: string
userId: string
provider: string
body: any
headers: Record<string, string>
path: string
blockId?: string
testMode?: boolean
executionTarget?: 'deployed' | 'live'
credentialId?: string
}
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting webhook execution`, {
webhookId: payload.webhookId,
workflowId: payload.workflowId,
provider: payload.provider,
userId: payload.userId,
executionId,
})
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
payload.webhookId,
payload.headers,
payload.body,
payload.provider
)
const runOperation = async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}
return await webhookIdempotency.executeWithIdempotency(
payload.provider,
idempotencyKey,
runOperation
)
}
async function executeWebhookJobInternal(
payload: WebhookExecutionPayload,
executionId: string,
requestId: string
) {
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
payload.provider || 'webhook',
requestId
)
try {
const workflowData =
payload.executionTarget === 'live'
? await loadWorkflowFromNormalizedTables(payload.workflowId)
: await loadDeployedWorkflowState(payload.workflowId)
if (!workflowData) {
throw new Error(
`Workflow state not found. The workflow may not be ${payload.executionTarget === 'live' ? 'saved' : 'deployed'} or the deployment data may be corrupted.`
)
}
const { blocks, edges, loops, parallels } = workflowData
const wfRows = await db
.select({ workspaceId: workflowTable.workspaceId, variables: workflowTable.variables })
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const workspaceId = wfRows[0]?.workspaceId || undefined
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
// Load the actual webhook record from database to get providerConfig
const [webhookRecord] = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
const webhookData = {
id: payload.webhookId,
provider: payload.provider,
providerConfig: webhookRecord.providerConfig,
}
// Create a mock workflow object for Airtable processing
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
// Get the processed Airtable input
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
// If we got input (changes), execute the workflow like other providers
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
// Get workflow for core execution
const workflow = await getWorkflowById(payload.workflowId)
if (!workflow) {
throw new Error(`Workflow ${payload.workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId,
userId: payload.userId,
triggerType: payload.provider || 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(
metadata,
workflow,
airtableInput,
{},
workflowVariables,
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
})
if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
logger.info(`[${requestId}] Airtable webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
}
// No changes to process
logger.info(`[${requestId}] No Airtable changes to process`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No Airtable changes to process' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No Airtable changes to process' },
executedAt: new Date().toISOString(),
}
}
// Format input for standard webhooks
// Load the actual webhook to get providerConfig (needed for Teams credentialId)
const webhookRows = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
const actualWebhook =
webhookRows.length > 0
? webhookRows[0]
: {
provider: payload.provider,
blockId: payload.blockId,
providerConfig: {},
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const mockRequest = {
headers: new Map(Object.entries(payload.headers)),
} as any
const input = await formatWebhookInput(actualWebhook, mockWorkflow, payload.body, mockRequest)
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No messages in WhatsApp payload' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No messages in WhatsApp payload' },
executedAt: new Date().toISOString(),
}
}
// Process trigger file outputs based on schema
if (input && payload.blockId && blocks[payload.blockId]) {
try {
const triggerBlock = blocks[payload.blockId]
const rawSelectedTriggerId = triggerBlock?.subBlocks?.selectedTriggerId?.value
const rawTriggerId = triggerBlock?.subBlocks?.triggerId?.value
const resolvedTriggerId = [rawSelectedTriggerId, rawTriggerId].find(
(candidate): candidate is string =>
typeof candidate === 'string' && isTriggerValid(candidate)
)
if (resolvedTriggerId) {
const triggerConfig = getTrigger(resolvedTriggerId)
if (triggerConfig.outputs) {
logger.debug(`[${requestId}] Processing trigger ${resolvedTriggerId} file outputs`)
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
workspaceId: workspaceId || '',
workflowId: payload.workflowId,
executionId,
requestId,
userId: payload.userId,
})
Object.assign(input, processedInput)
}
} else {
logger.debug(`[${requestId}] No valid triggerId found for block ${payload.blockId}`)
}
} catch (error) {
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
// Continue without processing attachments rather than failing execution
}
}
// Process generic webhook files based on inputFormat
if (input && payload.provider === 'generic' && payload.blockId && blocks[payload.blockId]) {
try {
const triggerBlock = blocks[payload.blockId]
if (triggerBlock?.subBlocks?.inputFormat?.value) {
const inputFormat = triggerBlock.subBlocks.inputFormat.value as unknown as Array<{
name: string
type: 'string' | 'number' | 'boolean' | 'object' | 'array' | 'files'
}>
logger.debug(`[${requestId}] Processing generic webhook files from inputFormat`)
const fileFields = inputFormat.filter((field) => field.type === 'files')
if (fileFields.length > 0 && typeof input === 'object' && input !== null) {
const executionContext = {
workspaceId: workspaceId || '',
workflowId: payload.workflowId,
executionId,
}
for (const fileField of fileFields) {
const fieldValue = input[fileField.name]
if (fieldValue && typeof fieldValue === 'object') {
const uploadedFiles = await processExecutionFiles(
fieldValue,
executionContext,
requestId,
payload.userId
)
if (uploadedFiles.length > 0) {
input[fileField.name] = uploadedFiles
logger.info(
`[${requestId}] Successfully processed ${uploadedFiles.length} file(s) for field: ${fileField.name}`
)
}
}
}
}
}
} catch (error) {
logger.error(`[${requestId}] Error processing generic webhook files:`, error)
// Continue without processing files rather than failing execution
}
}
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
// Get workflow for core execution
const workflow = await getWorkflowById(payload.workflowId)
if (!workflow) {
throw new Error(`Workflow ${payload.workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId,
userId: payload.userId,
triggerType: payload.provider || 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(
metadata,
workflow,
input || {},
{},
workflowVariables,
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
})
if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
provider: payload.provider,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
} catch (error: any) {
logger.error(`[${requestId}] Webhook execution failed`, {
error: error.message,
stack: error.stack,
workflowId: payload.workflowId,
provider: payload.provider,
})
try {
// Ensure logging session is started (safe to call multiple times)
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // May not be available for early errors
variables: {},
triggerData: {
isTest: payload.testMode === true,
executionTarget: payload.executionTarget || 'deployed',
},
})
const executionResult = (error?.executionResult as ExecutionResult | undefined) || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
},
traceSpans,
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete logging session`, loggingError)
}
throw error
}
}
export const webhookExecution = task({
id: 'webhook-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload),
})