Files
sim/apps/sim/background/webhook-execution.ts
Waleed cf023e4d22 feat(tools): added download file tool for onedrive, google drive, and slack; added move email tool for gmail and outlook (#1785)
* feat(tools): added download file tool for onedrive, google drive, and slack

* added gmail & outlook move tools, added missing credentials descriptions to modal

* added slack delete/update message, add reaction; added gmail read/unread/label/unarchive; added outlook copy/delete/read/unread

* added threads to slack operations

* added timestamp for slack webhook trigger since api uses timestamp for updating/reacting/deleting

* cleanup

* added file info to slack read messages

* updated slack desc

* fixed downloading for onedrive, slack, and drive

* fix type check

* fix build failure

* cleanup files, fix triggers with attachments, fix integration blocks with include attachment to parse to user files, remove unused code

* fix move files tools

* fix tests

* fix build errors

* fix type error

* fix tests

* remove redundant code and filter out unecessary user file fields

* fix lint error

* remove fields from tag dropdown

* fix file upload via API

* fix pdf parse issue

---------

Co-authored-by: waleed <waleed>
Co-authored-by: Adam Gough <adamgough@Adams-MacBook-Pro.local>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2025-11-05 13:00:34 -08:00

512 lines
16 KiB
TypeScript

import { db } from '@sim/db'
import { webhook, workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { processExecutionFiles } from '@/lib/execution/files'
import { IdempotencyService, webhookIdempotency } from '@/lib/idempotency'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils.server'
import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/db-helpers'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import { getTrigger, isTriggerValid } from '@/triggers'
const logger = createLogger('TriggerWebhookExecution')
/**
* Process trigger outputs based on their schema definitions
* Finds outputs marked as 'file' or 'file[]' and uploads them to execution storage
*/
async function processTriggerFileOutputs(
input: any,
triggerOutputs: Record<string, any>,
context: {
workspaceId: string
workflowId: string
executionId: string
requestId: string
userId?: string
},
path = ''
): Promise<any> {
if (!input || typeof input !== 'object') {
return input
}
const processed: any = Array.isArray(input) ? [] : {}
for (const [key, value] of Object.entries(input)) {
const currentPath = path ? `${path}.${key}` : key
const outputDef = triggerOutputs[key]
const val: any = value
// If this field is marked as file or file[], process it
if (outputDef?.type === 'file[]' && Array.isArray(val)) {
try {
processed[key] = await WebhookAttachmentProcessor.processAttachments(val as any, context)
} catch (error) {
processed[key] = []
}
} else if (outputDef?.type === 'file' && val) {
try {
const [processedFile] = await WebhookAttachmentProcessor.processAttachments(
[val as any],
context
)
processed[key] = processedFile
} catch (error) {
logger.error(`[${context.requestId}] Error processing ${currentPath}:`, error)
processed[key] = val
}
} else if (outputDef && typeof outputDef === 'object' && !outputDef.type) {
// Nested object in schema - recurse with the nested schema
processed[key] = await processTriggerFileOutputs(val, outputDef, context, currentPath)
} else {
// Not a file output - keep as is
processed[key] = val
}
}
return processed
}
export type WebhookExecutionPayload = {
webhookId: string
workflowId: string
userId: string
provider: string
body: any
headers: Record<string, string>
path: string
blockId?: string
testMode?: boolean
executionTarget?: 'deployed' | 'live'
credentialId?: string
}
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting webhook execution`, {
webhookId: payload.webhookId,
workflowId: payload.workflowId,
provider: payload.provider,
userId: payload.userId,
executionId,
})
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
payload.webhookId,
payload.headers
)
const runOperation = async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}
return await webhookIdempotency.executeWithIdempotency(
payload.provider,
idempotencyKey,
runOperation
)
}
async function executeWebhookJobInternal(
payload: WebhookExecutionPayload,
executionId: string,
requestId: string
) {
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
try {
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // Will be resolved below
variables: {},
triggerData: {
isTest: payload.testMode === true,
executionTarget: payload.executionTarget || 'deployed',
},
})
const workflowData =
payload.executionTarget === 'live'
? await loadWorkflowFromNormalizedTables(payload.workflowId)
: await loadDeployedWorkflowState(payload.workflowId)
if (!workflowData) {
throw new Error(
`Workflow state not found. The workflow may not be ${payload.executionTarget === 'live' ? 'saved' : 'deployed'} or the deployment data may be corrupted.`
)
}
const { blocks, edges, loops, parallels } = workflowData
const wfRows = await db
.select({ workspaceId: workflowTable.workspaceId, variables: workflowTable.variables })
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const workspaceId = wfRows[0]?.workspaceId || undefined
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
payload.userId,
workspaceId
)
const mergedEncrypted = { ...personalEncrypted, ...workspaceEncrypted }
const decryptedPairs = await Promise.all(
Object.entries(mergedEncrypted).map(async ([key, encrypted]) => {
const { decrypted } = await decryptSecret(encrypted)
return [key, decrypted] as const
})
)
const decryptedEnvVars: Record<string, string> = Object.fromEntries(decryptedPairs)
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
// Load the actual webhook record from database to get providerConfig
const [webhookRecord] = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
const webhookData = {
id: payload.webhookId,
provider: payload.provider,
providerConfig: webhookRecord.providerConfig,
}
// Create a mock workflow object for Airtable processing
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
// Get the processed Airtable input
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
// If we got input (changes), execute the workflow like other providers
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
// Get workflow for core execution
const workflow = await getWorkflowById(payload.workflowId)
if (!workflow) {
throw new Error(`Workflow ${payload.workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId,
userId: payload.userId,
triggerType: 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(
metadata,
workflow,
airtableInput,
{},
workflowVariables,
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
})
logger.info(`[${requestId}] Airtable webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
}
// No changes to process
logger.info(`[${requestId}] No Airtable changes to process`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No Airtable changes to process' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No Airtable changes to process' },
executedAt: new Date().toISOString(),
}
}
// Format input for standard webhooks
// Load the actual webhook to get providerConfig (needed for Teams credentialId)
const webhookRows = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
const actualWebhook =
webhookRows.length > 0
? webhookRows[0]
: {
provider: payload.provider,
blockId: payload.blockId,
providerConfig: {},
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const mockRequest = {
headers: new Map(Object.entries(payload.headers)),
} as any
const input = await formatWebhookInput(actualWebhook, mockWorkflow, payload.body, mockRequest)
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No messages in WhatsApp payload' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No messages in WhatsApp payload' },
executedAt: new Date().toISOString(),
}
}
// Process trigger file outputs based on schema
if (input && payload.blockId && blocks[payload.blockId]) {
try {
const triggerBlock = blocks[payload.blockId]
const rawSelectedTriggerId = triggerBlock?.subBlocks?.selectedTriggerId?.value
const rawTriggerId = triggerBlock?.subBlocks?.triggerId?.value
const resolvedTriggerId = [rawSelectedTriggerId, rawTriggerId].find(
(candidate): candidate is string =>
typeof candidate === 'string' && isTriggerValid(candidate)
)
if (resolvedTriggerId) {
const triggerConfig = getTrigger(resolvedTriggerId)
if (triggerConfig.outputs) {
logger.debug(`[${requestId}] Processing trigger ${resolvedTriggerId} file outputs`)
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
workspaceId: workspaceId || '',
workflowId: payload.workflowId,
executionId,
requestId,
userId: payload.userId,
})
Object.assign(input, processedInput)
}
} else {
logger.debug(`[${requestId}] No valid triggerId found for block ${payload.blockId}`)
}
} catch (error) {
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
// Continue without processing attachments rather than failing execution
}
}
// Process generic webhook files based on inputFormat
if (input && payload.provider === 'generic' && payload.blockId && blocks[payload.blockId]) {
try {
const triggerBlock = blocks[payload.blockId]
if (triggerBlock?.subBlocks?.inputFormat?.value) {
const inputFormat = triggerBlock.subBlocks.inputFormat.value as unknown as Array<{
name: string
type: 'string' | 'number' | 'boolean' | 'object' | 'array' | 'files'
}>
logger.debug(`[${requestId}] Processing generic webhook files from inputFormat`)
const fileFields = inputFormat.filter((field) => field.type === 'files')
if (fileFields.length > 0 && typeof input === 'object' && input !== null) {
const executionContext = {
workspaceId: workspaceId || '',
workflowId: payload.workflowId,
executionId,
}
for (const fileField of fileFields) {
const fieldValue = input[fileField.name]
if (fieldValue && typeof fieldValue === 'object') {
const uploadedFiles = await processExecutionFiles(
fieldValue,
executionContext,
requestId,
payload.userId
)
if (uploadedFiles.length > 0) {
input[fileField.name] = uploadedFiles
logger.info(
`[${requestId}] Successfully processed ${uploadedFiles.length} file(s) for field: ${fileField.name}`
)
}
}
}
}
}
} catch (error) {
logger.error(`[${requestId}] Error processing generic webhook files:`, error)
// Continue without processing files rather than failing execution
}
}
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
// Get workflow for core execution
const workflow = await getWorkflowById(payload.workflowId)
if (!workflow) {
throw new Error(`Workflow ${payload.workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId,
userId: payload.userId,
triggerType: 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(
metadata,
workflow,
input || {},
{},
workflowVariables,
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
})
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
provider: payload.provider,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
} catch (error: any) {
logger.error(`[${requestId}] Webhook execution failed`, {
error: error.message,
stack: error.stack,
workflowId: payload.workflowId,
provider: payload.provider,
})
try {
const executionResult = (error?.executionResult as ExecutionResult | undefined) || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
},
traceSpans,
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete logging session`, loggingError)
}
throw error
}
}
export const webhookExecution = task({
id: 'webhook-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload),
})