improvement(webhooks): move webhook exeucution to trigger.dev (#810)

* improvement(webhooks): move webhook exeucution to trigger dev

* remove old tests
This commit is contained in:
Vikhyath Mondreti
2025-07-28 13:04:25 -07:00
committed by GitHub
parent b12e415fea
commit 95efae9035
6 changed files with 432 additions and 1170 deletions

View File

@@ -1,6 +1,7 @@
import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { isDev } from '@/lib/environment'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
@@ -330,6 +331,22 @@ export async function executeWorkflowForChat(
const workflowId = deployment.workflowId
const executionId = uuidv4()
const usageCheck = await checkServerSideUsageLimits(deployment.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${deployment.userId} has exceeded usage limits. Skipping chat execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: deployment.workflowId,
chatId,
}
)
throw new Error(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue using chat.'
)
}
// Set up logging for chat execution
const loggingSession = new LoggingSession(workflowId, executionId, 'chat', requestId)

View File

@@ -284,219 +284,10 @@ describe('Webhook Trigger API Route', () => {
expect(text).toMatch(/not found/i) // Response should contain "not found" message
})
/**
* Test duplicate webhook request handling
* Verifies that duplicate requests are detected and not processed multiple times
*/
it('should handle duplicate webhook requests', async () => {
// Set up duplicate detection
hasProcessedMessageMock.mockResolvedValue(true) // Simulate duplicate
processGenericDeduplicationMock.mockResolvedValue(
new Response('Duplicate request', { status: 200 })
)
// Configure DB mock to return a webhook and workflow
const { db } = await import('@/db')
const limitMock = vi.fn().mockReturnValue([
{
webhook: {
id: 'webhook-id',
path: 'test-path',
isActive: true,
provider: 'generic', // Not Airtable to test standard path
workflowId: 'workflow-id',
providerConfig: {},
},
workflow: {
id: 'workflow-id',
userId: 'user-id',
},
},
])
const whereMock = vi.fn().mockReturnValue({ limit: limitMock })
const innerJoinMock = vi.fn().mockReturnValue({ where: whereMock })
const fromMock = vi.fn().mockReturnValue({ innerJoin: innerJoinMock })
// @ts-ignore - mocking the query chain
db.select.mockReturnValue({ from: fromMock })
// Create a mock request
const req = createMockRequest('POST', { event: 'test' })
// Mock the path param
const params = Promise.resolve({ path: 'test-path' })
// Import the handler after mocks are set up
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
// Call the handler
const response = await POST(req, { params })
// Expect 200 response for duplicate
expect(response.status).toBe(200)
// Verify response text indicates duplication
const text = await response.text()
expect(text).toMatch(/duplicate|received/i) // Response might be "Duplicate message" or "Request received"
})
/**
* Test Slack-specific webhook handling
* Verifies that Slack signature verification is performed
*/
// TODO: Fix failing test - returns 500 instead of 200
// it('should handle Slack webhooks with signature verification', async () => { ... })
/**
* Test error handling during webhook execution
*/
it('should handle errors during workflow execution', async () => {
// Mock the setTimeout to be faster for testing
// @ts-ignore - Replace global setTimeout for this test
global.setTimeout = vi.fn((callback) => {
callback() // Execute immediately
return 123 // Return a timer ID
})
// Set up error handling mocks
processWebhookMock.mockImplementation(() => {
throw new Error('Webhook execution failed')
})
executeMock.mockRejectedValue(new Error('Webhook execution failed'))
// Configure DB mock to return a webhook and workflow
const { db } = await import('@/db')
const limitMock = vi.fn().mockReturnValue([
{
webhook: {
id: 'webhook-id',
path: 'test-path',
isActive: true,
provider: 'generic', // Not Airtable to ensure we use the timeout path
workflowId: 'workflow-id',
providerConfig: {},
},
workflow: {
id: 'workflow-id',
userId: 'user-id',
},
},
])
const whereMock = vi.fn().mockReturnValue({ limit: limitMock })
const innerJoinMock = vi.fn().mockReturnValue({ where: whereMock })
const fromMock = vi.fn().mockReturnValue({ innerJoin: innerJoinMock })
// @ts-ignore - mocking the query chain
db.select.mockReturnValue({ from: fromMock })
// Create a mock request
const req = createMockRequest('POST', { event: 'test' })
// Mock the path param
const params = Promise.resolve({ path: 'test-path' })
// Import the handler after mocks are set up
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
// Call the handler
const response = await POST(req, { params })
// Verify response exists and check status code
// For non-Airtable webhooks, we expect 200 from the timeout response
expect(response).toBeDefined()
expect(response.status).toBe(200)
// Verify response text
const text = await response.text()
expect(text).toMatch(/received|processing/i)
})
/**
* Test Airtable webhook specific handling
* Verifies that Airtable webhooks use the synchronous processing path
*/
it('should handle Airtable webhooks synchronously', async () => {
// Create webhook payload for Airtable
const airtablePayload = {
base: {
id: 'appn9RltLQQMsquyL',
},
webhook: {
id: 'achpbXeBqNLsRFAnD',
},
timestamp: new Date().toISOString(),
}
// Reset fetch and process mock
fetchAndProcessAirtablePayloadsMock.mockResolvedValue(undefined)
// Configure DB mock to return an Airtable webhook
const { db } = await import('@/db')
const limitMock = vi.fn().mockReturnValue([
{
webhook: {
id: 'airtable-webhook-id',
path: 'airtable-path',
isActive: true,
provider: 'airtable', // Set provider to airtable to test that path
workflowId: 'workflow-id',
providerConfig: {
baseId: 'appn9RltLQQMsquyL',
externalId: 'achpbXeBqNLsRFAnD',
},
},
workflow: {
id: 'workflow-id',
userId: 'user-id',
},
},
])
const whereMock = vi.fn().mockReturnValue({ limit: limitMock })
const innerJoinMock = vi.fn().mockReturnValue({ where: whereMock })
const fromMock = vi.fn().mockReturnValue({ innerJoin: innerJoinMock })
// Configure db.select to return the appropriate mock for this test
// @ts-ignore - Ignore TypeScript errors for test mocks
db.select = vi.fn().mockReturnValue({ from: fromMock })
// Also mock the DB for the Airtable notification check
const whereMock2 = vi.fn().mockReturnValue({ limit: vi.fn().mockReturnValue([]) })
const fromMock2 = vi.fn().mockReturnValue({ where: whereMock2 })
// We need to handle multiple calls to db.select
let callCount = 0
// @ts-ignore - Ignore TypeScript errors for test mocks
db.select = vi.fn().mockImplementation(() => {
callCount++
if (callCount === 1) {
return { from: fromMock }
}
return { from: fromMock2 }
})
// Create a mock request with Airtable payload
const req = createMockRequest('POST', airtablePayload)
// Mock the path param
const params = Promise.resolve({ path: 'airtable-path' })
// Import the handler after mocks are set up
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
// Call the handler
const response = await POST(req, { params })
// For Airtable we expect 200 after synchronous processing
expect(response.status).toBe(200)
// Verify that the Airtable-specific function was called
expect(fetchAndProcessAirtablePayloadsMock).toHaveBeenCalledTimes(1)
// The response should indicate success
const text = await response.text()
expect(text).toMatch(/success|processed/i)
})
})

View File

@@ -1,19 +1,12 @@
import { and, eq, sql } from 'drizzle-orm'
import { tasks } from '@trigger.dev/sdk/v3'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { createLogger } from '@/lib/logs/console/logger'
import { acquireLock, hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
import {
fetchAndProcessAirtablePayloads,
handleSlackChallenge,
handleWhatsAppVerification,
processGenericDeduplication,
processWebhook,
processWhatsAppDeduplication,
validateMicrosoftTeamsSignature,
} from '@/lib/webhooks/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { subscription, webhook, workflow } from '@/db/schema'
import { RateLimiter } from '@/services/queue'
@@ -24,8 +17,6 @@ const logger = createLogger('WebhookTriggerAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 300
const _activeProcessingTasks = new Map<string, Promise<any>>()
/**
* Webhook Verification Handler (GET)
*
@@ -125,50 +116,13 @@ export async function POST(
return new NextResponse('Invalid JSON payload', { status: 400 })
}
// --- PHASE 2: Early Slack deduplication handling ---
const messageId = body?.event_id
if (body?.type === 'event_callback') {
const dedupeKey = messageId
? `slack:msg:${messageId}`
: `slack:${body?.team_id || ''}:${body?.event?.ts || body?.event?.event_ts || Date.now()}`
try {
const isDuplicate = await hasProcessedMessage(dedupeKey)
if (isDuplicate) {
logger.info(`[${requestId}] Duplicate Slack message detected: ${dedupeKey}`)
return new NextResponse('Duplicate message', { status: 200 })
}
await markMessageAsProcessed(dedupeKey, 60 * 60 * 24) // 24 hour TTL
} catch (error) {
logger.error(`[${requestId}] Error in Slack deduplication`, error)
// Continue processing - better to risk a duplicate than fail
}
// Handle Slack challenge
const slackResponse = handleSlackChallenge(body)
if (slackResponse) {
return slackResponse
}
// --- PHASE 3: Distributed lock acquisition ---
let hasExecutionLock = false
let executionLockKey: string
if (body?.type === 'event_callback') {
// For Slack events, use message-specific lock key
executionLockKey = messageId
? `execution:lock:slack:${messageId}`
: `execution:lock:slack:${body?.team_id || ''}:${body?.event?.ts || body?.event?.event_ts || Date.now()}`
} else {
// Default fallback for other providers
executionLockKey = `execution:lock:${requestId}:${crypto.randomUUID()}`
}
try {
hasExecutionLock = await acquireLock(executionLockKey, requestId, 30) // 30 second TTL
} catch (lockError) {
logger.error(`[${requestId}] Error acquiring execution lock`, lockError)
// Proceed without lock in case of Redis failure (fallback to best-effort)
}
// --- PHASE 4: Webhook identification ---
// --- PHASE 2: Webhook identification ---
const path = (await params).path
logger.info(`[${requestId}] Processing webhook request for path: ${path}`)
@@ -191,60 +145,7 @@ export async function POST(
foundWebhook = webhooks[0].webhook
foundWorkflow = webhooks[0].workflow
const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id)
if (!normalizedData) {
logger.error(`[${requestId}] No normalized data found for webhook workflow ${foundWorkflow.id}`)
return new NextResponse('Workflow data not found in normalized tables', { status: 500 })
}
// Construct state from normalized data only (execution-focused, no frontend state fields)
foundWorkflow.state = {
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
parallels: normalizedData.parallels,
lastSaved: Date.now(),
isDeployed: foundWorkflow.isDeployed || false,
deployedAt: foundWorkflow.deployedAt,
}
// Special handling for Telegram webhooks to work around middleware User-Agent checks
if (foundWebhook.provider === 'telegram') {
// Log detailed information about the request for debugging
const userAgent = request.headers.get('user-agent') || 'empty'
logger.info(`[${requestId}] Received Telegram webhook request:`, {
userAgent,
path,
clientIp:
request.headers.get('x-forwarded-for') || request.headers.get('x-real-ip') || 'unknown',
method: request.method,
contentType: request.headers.get('content-type'),
hasUpdate: !!body?.update_id,
})
// Ensure User-Agent headers for Telegram in future requests from the bot
// We can't modify the incoming request, but we can recommend adding it for future setup
if (!userAgent || userAgent === 'empty') {
logger.warn(
`[${requestId}] Telegram webhook request missing User-Agent header. Recommend reconfiguring webhook with 'TelegramBot/1.0' User-Agent.`
)
}
}
// Detect provider type
const isAirtableWebhook = foundWebhook.provider === 'airtable'
const isGmailWebhook = foundWebhook.provider === 'gmail'
// Handle Slack challenge verification (must be done before timeout)
const slackChallengeResponse =
body?.type === 'url_verification' ? handleSlackChallenge(body) : null
if (slackChallengeResponse) {
logger.info(`[${requestId}] Responding to Slack URL verification challenge`)
return slackChallengeResponse
}
// Handle Microsoft Teams outgoing webhook signature verification (must be done before timeout)
// Handle Microsoft Teams signature verification if needed
if (foundWebhook.provider === 'microsoftteams') {
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
@@ -258,9 +159,6 @@ export async function POST(
return new NextResponse('Unauthorized - Missing HMAC signature', { status: 401 })
}
// Get the raw body for HMAC verification
const rawBody = await request.text()
const isValidSignature = validateMicrosoftTeamsSignature(
providerConfig.hmacSecret,
authHeader,
@@ -273,247 +171,99 @@ export async function POST(
}
logger.debug(`[${requestId}] Microsoft Teams HMAC signature verified successfully`)
// Parse the body again since we consumed it for verification
try {
body = JSON.parse(rawBody)
} catch (parseError) {
logger.error(
`[${requestId}] Failed to parse Microsoft Teams webhook body after verification`,
{
error: parseError instanceof Error ? parseError.message : String(parseError),
}
)
return new NextResponse('Invalid JSON payload', { status: 400 })
}
}
}
// Skip processing if another instance is already handling this request
if (!hasExecutionLock) {
logger.info(`[${requestId}] Skipping execution as lock was not acquired`)
return new NextResponse('Request is being processed by another instance', { status: 200 })
// --- PHASE 3: Rate limiting for webhook execution ---
try {
// Get user subscription for rate limiting
const [subscriptionRecord] = await db
.select({ plan: subscription.plan })
.from(subscription)
.where(eq(subscription.referenceId, foundWorkflow.userId))
.limit(1)
const subscriptionPlan = (subscriptionRecord?.plan || 'free') as SubscriptionPlan
// Check async rate limits (webhooks are processed asynchronously)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimit(
foundWorkflow.userId,
subscriptionPlan,
'webhook',
true // isAsync = true for webhook execution
)
if (!rateLimitCheck.allowed) {
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, {
provider: foundWebhook.provider,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
})
// Return 200 to prevent webhook provider retries, but indicate rate limit
if (foundWebhook.provider === 'microsoftteams') {
// Microsoft Teams requires specific response format
return NextResponse.json({
type: 'message',
text: 'Rate limit exceeded. Please try again later.',
})
}
// Simple error response for other providers (return 200 to prevent retries)
return NextResponse.json({ message: 'Rate limit exceeded' }, { status: 200 })
}
logger.debug(`[${requestId}] Rate limit check passed for webhook`, {
provider: foundWebhook.provider,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
})
} catch (rateLimitError) {
logger.error(`[${requestId}] Error checking webhook rate limits:`, rateLimitError)
// Continue processing - better to risk rate limit bypass than fail webhook
}
// --- PHASE 5: Provider-specific processing ---
// --- PHASE 4: Queue webhook execution via trigger.dev ---
try {
// Queue the webhook execution task
const handle = await tasks.trigger('webhook-execution', {
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
userId: foundWorkflow.userId,
provider: foundWebhook.provider,
body,
headers: Object.fromEntries(request.headers.entries()),
path,
blockId: foundWebhook.blockId,
})
// For Airtable: Process synchronously without timeouts
if (isAirtableWebhook) {
try {
logger.info(`[${requestId}] Airtable webhook ping received for webhook: ${foundWebhook.id}`)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
// Handle Airtable deduplication
const notificationId = body.notificationId || null
if (notificationId) {
try {
const processedKey = `airtable-webhook-${foundWebhook.id}-${notificationId}`
// Check if notification was already processed
const alreadyProcessed = await db
.select({ id: webhook.id })
.from(webhook)
.where(
and(
eq(webhook.id, foundWebhook.id),
sql`(webhook.provider_config->>'processedNotifications')::jsonb ? ${processedKey}`
)
)
.limit(1)
if (alreadyProcessed.length > 0) {
logger.info(
`[${requestId}] Duplicate Airtable notification detected: ${notificationId}`
)
return new NextResponse('Notification already processed', { status: 200 })
}
// Store notification ID for deduplication
const providerConfig = foundWebhook.providerConfig || {}
const processedNotifications = providerConfig.processedNotifications || []
processedNotifications.push(processedKey)
// Keep only the last 100 notifications to prevent unlimited growth
const limitedNotifications = processedNotifications.slice(-100)
// Update the webhook record
await db
.update(webhook)
.set({
providerConfig: {
...providerConfig,
processedNotifications: limitedNotifications,
},
updatedAt: new Date(),
})
.where(eq(webhook.id, foundWebhook.id))
} catch (error) {
logger.warn(`[${requestId}] Airtable deduplication check failed, continuing`, {
error: error instanceof Error ? error.message : String(error),
})
}
}
// Process Airtable payloads synchronously
try {
logger.info(`[${requestId}] Starting Airtable payload processing`)
await fetchAndProcessAirtablePayloads(foundWebhook, foundWorkflow, requestId)
return new NextResponse('Airtable ping processed successfully', { status: 200 })
} catch (error: any) {
logger.error(`[${requestId}] Error during Airtable processing`, {
error: error.message,
})
return new NextResponse(`Error processing Airtable webhook: ${error.message}`, {
status: 500,
})
}
} catch (error: any) {
logger.error(`[${requestId}] Error in Airtable processing`, error)
return new NextResponse(`Internal server error: ${error.message}`, { status: 500 })
// Return immediate acknowledgment with provider-specific format
if (foundWebhook.provider === 'microsoftteams') {
// Microsoft Teams requires specific response format
return NextResponse.json({
type: 'message',
text: 'Sim Studio',
})
}
return NextResponse.json({ message: 'Webhook processed' })
} catch (error: any) {
logger.error(`[${requestId}] Failed to queue webhook execution:`, error)
// Still return 200 to prevent webhook provider retries
if (foundWebhook.provider === 'microsoftteams') {
// Microsoft Teams requires specific response format
return NextResponse.json({
type: 'message',
text: 'Webhook processing failed',
})
}
return NextResponse.json({ message: 'Internal server error' }, { status: 200 })
}
// --- For all other webhook types: Use async processing with timeout ---
// Create timeout promise for fast initial response (2.5 seconds)
const timeoutDuration = 25000
const timeoutPromise = new Promise<NextResponse>((resolve) => {
setTimeout(() => {
logger.info(`[${requestId}] Fast response timeout activated`)
resolve(new NextResponse('Request received', { status: 200 }))
}, timeoutDuration)
})
// Create the processing promise for asynchronous execution
const processingPromise = (async () => {
try {
// Provider-specific deduplication
if (foundWebhook.provider === 'whatsapp') {
const data = body?.entry?.[0]?.changes?.[0]?.value
const messages = data?.messages || []
const whatsappDuplicateResponse = await processWhatsAppDeduplication(requestId, messages)
if (whatsappDuplicateResponse) {
return whatsappDuplicateResponse
}
} else if (foundWebhook.provider === 'gmail') {
// Gmail-specific validation and logging
logger.info(`[${requestId}] Gmail webhook request received for webhook: ${foundWebhook.id}`)
const webhookSecret = foundWebhook.secret
if (webhookSecret) {
const secretHeader = request.headers.get('X-Webhook-Secret')
if (secretHeader !== webhookSecret) {
logger.warn(`[${requestId}] Invalid webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
}
if (!body.email) {
logger.warn(`[${requestId}] Invalid Gmail webhook payload format`)
return new NextResponse('Invalid payload format', { status: 400 })
}
logger.info(`[${requestId}] Processing Gmail email`, {
emailId: body.email.id,
subject:
body.email?.payload?.headers?.find((h: any) => h.name === 'Subject')?.value ||
'No subject',
})
// Gmail deduplication using generic method
const genericDuplicateResponse = await processGenericDeduplication(requestId, path, body)
if (genericDuplicateResponse) {
return genericDuplicateResponse
}
} else if (foundWebhook.provider !== 'slack') {
// Generic deduplication for all other providers
const genericDuplicateResponse = await processGenericDeduplication(requestId, path, body)
if (genericDuplicateResponse) {
return genericDuplicateResponse
}
}
// Check rate limits for webhook execution
const [subscriptionRecord] = await db
.select({ plan: subscription.plan })
.from(subscription)
.where(eq(subscription.referenceId, foundWorkflow.userId))
.limit(1)
const subscriptionPlan = (subscriptionRecord?.plan || 'free') as SubscriptionPlan
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimit(
foundWorkflow.userId,
subscriptionPlan,
'webhook',
false // webhooks are always sync
)
if (!rateLimitCheck.allowed) {
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, {
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
})
// Return 200 to prevent webhook retries but indicate rate limit in response
return new NextResponse(
JSON.stringify({
status: 'error',
message: `Rate limit exceeded. You have ${rateLimitCheck.remaining} requests remaining. Resets at ${rateLimitCheck.resetAt.toISOString()}`,
}),
{
status: 200, // Use 200 to prevent webhook provider retries
headers: { 'Content-Type': 'application/json' },
}
)
}
// Check if the user has exceeded their usage limits
const usageCheck = await checkServerSideUsageLimits(foundWorkflow.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${foundWorkflow.userId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: foundWorkflow.id,
}
)
// Return a successful response to avoid webhook retries, but don't execute the workflow
return new NextResponse(
JSON.stringify({
status: 'error',
message:
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.',
}),
{
status: 200, // Use 200 to prevent webhook provider retries
headers: { 'Content-Type': 'application/json' },
}
)
}
// Execute workflow for the webhook event
logger.info(`[${requestId}] Executing workflow for ${foundWebhook.provider} webhook`)
const executionId = uuidv4()
return await processWebhook(
foundWebhook,
foundWorkflow,
body,
request,
executionId,
requestId
)
} catch (error: any) {
logger.error(`[${requestId}] Error processing webhook:`, error)
return new NextResponse(`Internal server error: ${error.message}`, { status: 500 })
}
})()
// Race processing against timeout to ensure fast response
return Promise.race([timeoutPromise, processingPromise])
}

View File

@@ -1,18 +1,9 @@
import { and, eq, sql } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { getOAuthToken } from '@/app/api/auth/oauth/utils'
import { db } from '@/db'
import { environment as environmentTable, userStats, webhook } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockStateAsync } from '@/stores/workflows/server-utils'
import { webhook } from '@/db/schema'
const logger = createLogger('WebhookUtils')
@@ -148,67 +139,6 @@ export async function validateSlackSignature(
}
}
/**
* Process WhatsApp message deduplication
*/
export async function processWhatsAppDeduplication(
requestId: string,
messages: any[]
): Promise<NextResponse | null> {
if (messages.length > 0) {
const message = messages[0]
const messageId = message.id
if (messageId) {
const whatsappMsgKey = `whatsapp:msg:${messageId}`
try {
const isDuplicate = await hasProcessedMessage(whatsappMsgKey)
if (isDuplicate) {
logger.info(`[${requestId}] Duplicate WhatsApp message detected: ${messageId}`)
return new NextResponse('Duplicate message', { status: 200 })
}
// Mark as processed BEFORE processing
await markMessageAsProcessed(whatsappMsgKey, 60 * 60 * 24)
} catch (error) {
logger.error(`[${requestId}] Error in WhatsApp deduplication`, error)
// Continue processing
}
}
}
return null
}
/**
* Process generic deduplication using request hash
*/
export async function processGenericDeduplication(
requestId: string,
path: string,
body: any
): Promise<NextResponse | null> {
try {
const requestHash = await generateRequestHash(path, body)
const genericMsgKey = `generic:${requestHash}`
const isDuplicate = await hasProcessedMessage(genericMsgKey)
if (isDuplicate) {
logger.info(`[${requestId}] Duplicate request detected with hash: ${requestHash}`)
return new NextResponse('Duplicate request', { status: 200 })
}
// Mark as processed
await markMessageAsProcessed(genericMsgKey, 60 * 60 * 24)
} catch (error) {
logger.error(`[${requestId}] Error in generic deduplication`, error)
// Continue processing
}
return null
}
/**
* Format webhook input based on provider
*/
@@ -471,375 +401,6 @@ export function formatWebhookInput(
}
}
/**
* Execute workflow with the provided input
*/
export async function executeWorkflowFromPayload(
foundWorkflow: any,
input: any,
executionId: string,
requestId: string,
startBlockId?: string | null
): Promise<void> {
// Add log at the beginning of this function for clarity
logger.info(`[${requestId}] Preparing to execute workflow`, {
workflowId: foundWorkflow.id,
executionId,
triggerSource: 'webhook-payload',
})
const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId)
try {
// Load workflow data from normalized tables
logger.debug(`[${requestId}] Loading workflow ${foundWorkflow.id} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id)
if (!normalizedData) {
logger.error(`[${requestId}] TRACE: No normalized data found for workflow`, {
workflowId: foundWorkflow.id,
hasNormalizedData: false,
})
throw new Error(`Workflow ${foundWorkflow.id} data not found in normalized tables`)
}
// Use normalized data for execution
const { blocks, edges, loops, parallels } = normalizedData
logger.info(`[${requestId}] Loaded workflow ${foundWorkflow.id} from normalized tables`)
// DEBUG: Log state information
logger.debug(`[${requestId}] TRACE: Retrieved workflow state from normalized tables`, {
workflowId: foundWorkflow.id,
blockCount: Object.keys(blocks || {}).length,
edgeCount: (edges || []).length,
loopCount: Object.keys(loops || {}).length,
})
logger.debug(
`[${requestId}] Merging subblock states for workflow ${foundWorkflow.id} (Execution: ${executionId})`
)
const mergeStartTime = Date.now()
const mergedStates = await mergeSubblockStateAsync(blocks, foundWorkflow.id)
logger.debug(`[${requestId}] TRACE: State merging complete`, {
duration: `${Date.now() - mergeStartTime}ms`,
mergedBlockCount: Object.keys(mergedStates).length,
})
// Retrieve and decrypt environment variables
const [userEnv] = await db
.select()
.from(environmentTable)
.where(eq(environmentTable.userId, foundWorkflow.userId))
.limit(1)
let decryptedEnvVars: Record<string, string> = {}
if (userEnv) {
// Decryption logic
const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map(
async ([key, encryptedValue]) => {
try {
const { decrypted } = await decryptSecret(encryptedValue as string)
return [key, decrypted] as const
} catch (error: any) {
logger.error(
`[${requestId}] Failed to decrypt environment variable "${key}" (Execution: ${executionId})`,
error
)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
)
const decryptedEntries = await Promise.all(decryptionPromises)
decryptedEnvVars = Object.fromEntries(decryptedEntries)
} else {
logger.debug(`[${requestId}] TRACE: No environment variables found for user`, {
userId: foundWorkflow.userId,
})
}
await loggingSession.safeStart({
userId: foundWorkflow.userId,
workspaceId: foundWorkflow.workspaceId,
variables: decryptedEnvVars,
})
// Process block states (extract subBlock values, parse responseFormat)
const blockStatesStartTime = Date.now()
const currentBlockStates = Object.entries(mergedStates).reduce(
(acc, [id, block]) => {
acc[id] = Object.entries(block.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
const processedState = { ...blockState }
if (processedState.responseFormat) {
try {
if (typeof processedState.responseFormat === 'string') {
processedState.responseFormat = JSON.parse(processedState.responseFormat)
}
if (
processedState.responseFormat &&
typeof processedState.responseFormat === 'object'
) {
if (!processedState.responseFormat.schema && !processedState.responseFormat.name) {
processedState.responseFormat = {
name: 'response_schema',
schema: processedState.responseFormat,
strict: true,
}
}
}
acc[blockId] = processedState
} catch (error) {
logger.warn(
`[${requestId}] Failed to parse responseFormat for block ${blockId} (Execution: ${executionId})`,
error
)
acc[blockId] = blockState
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
// DEBUG: Log block state processing
logger.debug(`[${requestId}] TRACE: Block states processed`, {
duration: `${Date.now() - blockStatesStartTime}ms`,
blockCount: Object.keys(processedBlockStates).length,
})
// Serialize and get workflow variables
const serializeStartTime = Date.now()
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates as any,
edges,
loops,
parallels
)
let workflowVariables = {}
if (foundWorkflow.variables) {
try {
if (typeof foundWorkflow.variables === 'string') {
workflowVariables = JSON.parse(foundWorkflow.variables)
} else {
workflowVariables = foundWorkflow.variables
}
} catch (error) {
logger.error(
`[${requestId}] Failed to parse workflow variables: ${foundWorkflow.id} (Execution: ${executionId})`,
error
)
}
}
// DEBUG: Log serialization completion
logger.debug(`[${requestId}] TRACE: Workflow serialized`, {
duration: `${Date.now() - serializeStartTime}ms`,
hasWorkflowVars: Object.keys(workflowVariables).length > 0,
})
logger.debug(`[${requestId}] Starting workflow execution`, {
executionId,
blockCount: Object.keys(processedBlockStates).length,
})
// Log blocks for debugging (if any missing or invalid)
if (Object.keys(processedBlockStates).length === 0) {
logger.error(`[${requestId}] No blocks found in workflow state - this will likely fail`)
} else {
logger.debug(`[${requestId}] Block IDs for execution:`, {
blockIds: Object.keys(processedBlockStates).slice(0, 5), // Log just a few block IDs for debugging
totalBlocks: Object.keys(processedBlockStates).length,
})
}
// Ensure workflow variables exist
if (!workflowVariables || Object.keys(workflowVariables).length === 0) {
logger.debug(`[${requestId}] No workflow variables defined, using empty object`)
workflowVariables = {}
}
// Validate input format for Airtable webhooks to prevent common errors
if (
input?.airtableChanges &&
(!Array.isArray(input.airtableChanges) || input.airtableChanges.length === 0)
) {
logger.warn(
`[${requestId}] Invalid Airtable input format - airtableChanges should be a non-empty array`
)
}
// DEBUG: Log critical moment before executor creation
logger.info(`[${requestId}] TRACE: Creating workflow executor`, {
workflowId: foundWorkflow.id,
hasSerializedWorkflow: !!serializedWorkflow,
blockCount: Object.keys(processedBlockStates).length,
timestamp: new Date().toISOString(),
})
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
input,
workflowVariables
)
// Set up logging on the executor
loggingSession.setupExecutor(executor)
// Log workflow execution start time for tracking
const executionStartTime = Date.now()
logger.info(`[${requestId}] TRACE: Executor instantiated, starting workflow execution now`, {
workflowId: foundWorkflow.id,
timestamp: new Date().toISOString(),
})
// Add direct detailed logging right before executing
logger.info(
`[${requestId}] EXECUTION_MONITOR: About to call executor.execute() - CRITICAL POINT`,
{
workflowId: foundWorkflow.id,
executionId: executionId,
timestamp: new Date().toISOString(),
}
)
// This is THE critical line where the workflow actually executes
const result = await executor.execute(foundWorkflow.id, startBlockId || undefined)
// Check if we got a StreamingExecution result (with stream + execution properties)
// For webhook executions, we only care about the ExecutionResult part, not the stream
const executionResult = 'stream' in result && 'execution' in result ? result.execution : result
// Add direct detailed logging right after executing
logger.info(`[${requestId}] EXECUTION_MONITOR: executor.execute() completed with result`, {
workflowId: foundWorkflow.id,
executionId: executionId,
success: executionResult.success,
resultType: result ? typeof result : 'undefined',
timestamp: new Date().toISOString(),
})
// Log completion and timing
const executionDuration = Date.now() - executionStartTime
logger.info(`[${requestId}] TRACE: Workflow execution completed`, {
workflowId: foundWorkflow.id,
success: executionResult.success,
duration: `${executionDuration}ms`,
actualDurationMs: executionDuration,
timestamp: new Date().toISOString(),
})
logger.info(`[${requestId}] Workflow execution finished`, {
executionId,
success: executionResult.success,
durationMs: executionResult.metadata?.duration || executionDuration,
actualDurationMs: executionDuration,
})
// Update counts and stats if successful
if (executionResult.success) {
await updateWorkflowRunCounts(foundWorkflow.id)
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, foundWorkflow.userId))
}
// Calculate total duration for logging
const totalDuration = executionResult.metadata?.duration || 0
const traceSpans = (executionResult.logs || []).map((blockLog: any, index: number) => {
let output = blockLog.output
if (!blockLog.success && blockLog.error) {
output = {
error: blockLog.error,
success: false,
...(blockLog.output || {}),
}
}
return {
id: blockLog.blockId,
name: `Block ${blockLog.blockName || blockLog.blockType} (${blockLog.blockType || 'unknown'})`,
type: blockLog.blockType || 'unknown',
duration: blockLog.durationMs || 0,
startTime: blockLog.startedAt,
endTime: blockLog.endedAt || blockLog.startedAt,
status: blockLog.success ? 'success' : 'error',
blockId: blockLog.blockId,
input: blockLog.input,
output: output,
tokens: blockLog.output?.tokens?.total || 0,
relativeStartMs: index * 100,
children: [],
toolCalls: (blockLog as any).toolCalls || [],
}
})
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: (traceSpans || []) as any,
})
// DEBUG: Final success log
logger.info(`[${requestId}] TRACE: Execution logs persisted successfully`, {
workflowId: foundWorkflow.id,
executionId,
timestamp: new Date().toISOString(),
})
} catch (error: any) {
// DEBUG: Detailed error information
logger.error(`[${requestId}] TRACE: Error during workflow execution`, {
workflowId: foundWorkflow.id,
executionId,
errorType: error.constructor.name,
errorMessage: error.message,
stack: error.stack,
timestamp: new Date().toISOString(),
})
logger.error(`[${requestId}] Error executing workflow`, {
workflowId: foundWorkflow.id,
executionId,
error: error.message,
stack: error.stack,
})
// Error logging handled by logging session
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook workflow execution failed',
stackTrace: error.stack,
},
})
// Re-throw the error so the caller knows it failed
throw error
}
}
/**
* Validates a Microsoft Teams outgoing webhook request signature using HMAC SHA-256
* @param hmacSecret - Microsoft Teams HMAC secret (base64 encoded)
@@ -1378,26 +939,23 @@ export async function fetchAndProcessAirtablePayloads(
}
)
await executeWorkflowFromPayload(workflowData, input, requestId, requestId, null)
// COMPLETION LOG - This will only appear if execution succeeds
logger.info(`[${requestId}] CRITICAL_TRACE: Workflow execution completed successfully`, {
// Return the processed input for the trigger.dev task to handle
logger.info(`[${requestId}] CRITICAL_TRACE: Airtable changes processed, returning input`, {
workflowId: workflowData.id,
timestamp: new Date().toISOString(),
})
} catch (executionError: any) {
// Errors logged within executeWorkflowFromPayload
logger.error(`[${requestId}] CRITICAL_TRACE: Workflow execution failed with error`, {
workflowId: workflowData.id,
error: executionError.message,
stack: executionError.stack,
recordCount: finalConsolidatedChanges.length,
timestamp: new Date().toISOString(),
})
logger.error(
`[${requestId}] Error during workflow execution triggered by Airtable polling`,
executionError
)
return input
} catch (processingError: any) {
logger.error(`[${requestId}] CRITICAL_TRACE: Error processing Airtable changes`, {
workflowId: workflowData.id,
error: processingError.message,
stack: processingError.stack,
timestamp: new Date().toISOString(),
})
throw processingError
}
} else {
// DEBUG: Log when no changes are found
@@ -1429,166 +987,6 @@ export async function fetchAndProcessAirtablePayloads(
})
}
/**
* Process webhook verification and authorization
*/
/**
* Handle standard webhooks with synchronous execution
*/
async function processStandardWebhook(
foundWebhook: any,
foundWorkflow: any,
input: any,
executionId: string,
requestId: string
): Promise<NextResponse> {
logger.info(
`[${requestId}] Executing workflow ${foundWorkflow.id} for webhook ${foundWebhook.id} (Execution: ${executionId})`
)
await executeWorkflowFromPayload(
foundWorkflow,
input,
executionId,
requestId,
foundWebhook.blockId
)
// Since executeWorkflowFromPayload handles logging and errors internally,
// we just need to return a success response for synchronous webhooks.
// Microsoft Teams requires a specific response format.
if (foundWebhook.provider === 'microsoftteams') {
return NextResponse.json(
{
type: 'message',
text: 'Sim Studio',
},
{ status: 200 }
)
}
return NextResponse.json({ message: 'Webhook processed' }, { status: 200 })
}
/**
* Handle webhook processing errors with provider-specific responses
*/
function handleWebhookError(
error: any,
foundWebhook: any,
executionId: string,
requestId: string
): NextResponse {
logger.error(
`[${requestId}] Error in processWebhook for ${foundWebhook.id} (Execution: ${executionId})`,
error
)
// For Microsoft Teams outgoing webhooks, return the expected error format
if (foundWebhook.provider === 'microsoftteams') {
return NextResponse.json(
{
type: 'message',
text: 'Webhook processing failed',
},
{ status: 200 }
) // Still return 200 to prevent Teams from showing additional error messages
}
return new NextResponse(`Internal Server Error: ${error.message}`, {
status: 500,
})
}
export async function processWebhook(
foundWebhook: any,
foundWorkflow: any,
body: any,
request: NextRequest,
executionId: string,
requestId: string
): Promise<NextResponse> {
try {
// --- Handle Airtable differently - it should always use fetchAndProcessAirtablePayloads ---
if (foundWebhook.provider === 'airtable') {
logger.info(`[${requestId}] Routing Airtable webhook through dedicated processor`)
await fetchAndProcessAirtablePayloads(foundWebhook, foundWorkflow, requestId)
return NextResponse.json({ message: 'Airtable webhook processed' }, { status: 200 })
}
// --- Provider-specific Auth/Verification (excluding Airtable/WhatsApp/Slack/MicrosoftTeams handled earlier) ---
if (
foundWebhook.provider &&
!['airtable', 'whatsapp', 'slack', 'microsoftteams'].includes(foundWebhook.provider)
) {
const verificationResponse = verifyProviderWebhook(foundWebhook, request, requestId)
if (verificationResponse) {
return verificationResponse
}
}
// --- Format Input based on provider (excluding Airtable) ---
const input = formatWebhookInput(foundWebhook, foundWorkflow, body, request)
if (!input && foundWebhook.provider === 'whatsapp') {
return new NextResponse('No messages in WhatsApp payload', { status: 200 })
}
// --- Route to standard processor for all providers ---
return await processStandardWebhook(foundWebhook, foundWorkflow, input, executionId, requestId)
} catch (error: any) {
return handleWebhookError(error, foundWebhook, executionId, requestId)
}
}
/**
* Generate a hash for request deduplication
*/
export async function generateRequestHash(path: string, body: any): Promise<string> {
try {
const normalizedBody = normalizeBody(body)
const requestString = `${path}:${JSON.stringify(normalizedBody)}`
let hash = 0
for (let i = 0; i < requestString.length; i++) {
const char = requestString.charCodeAt(i)
hash = (hash << 5) - hash + char
hash = hash & hash // Convert to 32bit integer
}
return `request:${path}:${hash}`
} catch (_error) {
return `request:${path}:${uuidv4()}`
}
}
/**
* Normalize the body for consistent hashing
*/
export function normalizeBody(body: any): any {
if (!body || typeof body !== 'object') return body
const result = Array.isArray(body) ? [...body] : { ...body }
const fieldsToRemove = [
'timestamp',
'random',
'nonce',
'requestId',
'event_id',
'event_time' /* Add other volatile fields */,
] // Made case-insensitive check below
if (Array.isArray(result)) {
return result.map((item) => normalizeBody(item))
}
for (const key in result) {
// Use lowercase check for broader matching
if (fieldsToRemove.includes(key.toLowerCase())) {
delete result[key]
} else if (typeof result[key] === 'object' && result[key] !== null) {
result[key] = normalizeBody(result[key])
}
}
return result
}
// Define an interface for AirtableChange
export interface AirtableChange {
tableId: string

View File

@@ -0,0 +1,289 @@
import { task } from '@trigger.dev/sdk/v3'
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment as environmentTable, userStats } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerWebhookExecution')
export const webhookExecution = task({
id: 'webhook-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: {
webhookId: string
workflowId: string
userId: string
provider: string
body: any
headers: Record<string, string>
path: string
blockId?: string
}) => {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting webhook execution via trigger.dev`, {
webhookId: payload.webhookId,
workflowId: payload.workflowId,
provider: payload.provider,
userId: payload.userId,
executionId,
})
// Initialize logging session outside try block so it's available in catch
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
try {
// Check usage limits first
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.'
)
}
// Load workflow from normalized tables
const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId)
if (!workflowData) {
throw new Error(`Workflow not found: ${payload.workflowId}`)
}
const { blocks, edges, loops, parallels } = workflowData
// Get environment variables (matching workflow-execution pattern)
const [userEnv] = await db
.select()
.from(environmentTable)
.where(eq(environmentTable.userId, payload.userId))
.limit(1)
let decryptedEnvVars: Record<string, string> = {}
if (userEnv) {
const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map(
async ([key, encryptedValue]) => {
try {
const { decrypted } = await decryptSecret(encryptedValue as string)
return [key, decrypted] as const
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
)
const decryptedPairs = await Promise.all(decryptionPromises)
decryptedEnvVars = Object.fromEntries(decryptedPairs)
}
// Start logging session
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // TODO: Get from workflow if needed
variables: decryptedEnvVars,
})
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
// Process block states for execution
const processedBlockStates = Object.entries(mergedStates).reduce(
(acc, [blockId, blockState]) => {
acc[blockId] = Object.entries(blockState.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Handle workflow variables (for now, use empty object since we don't have workflow metadata)
const workflowVariables = {}
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {}
)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(
`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`
)
const webhookData = {
id: payload.webhookId,
provider: payload.provider,
providerConfig: {}, // Will be loaded within fetchAndProcessAirtablePayloads
}
// Create a mock workflow object for Airtable processing
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
await fetchAndProcessAirtablePayloads(webhookData, mockWorkflow, requestId)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'Airtable webhook processed' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'Airtable webhook processed' },
executedAt: new Date().toISOString(),
}
}
// Format input for standard webhooks
const mockWebhook = {
provider: payload.provider,
blockId: payload.blockId,
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const mockRequest = {
headers: new Map(Object.entries(payload.headers)),
} as any
const input = formatWebhookInput(mockWebhook, mockWorkflow, payload.body, mockRequest)
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No messages in WhatsApp payload' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No messages in WhatsApp payload' },
executedAt: new Date().toISOString(),
}
}
// Create executor and execute
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
input || {},
workflowVariables
)
// Set up logging on the executor
loggingSession.setupExecutor(executor)
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
// Execute the workflow
const result = await executor.execute(payload.workflowId, payload.blockId)
// Check if we got a StreamingExecution result
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
provider: payload.provider,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
} catch (error: any) {
logger.error(`[${requestId}] Webhook execution failed`, {
error: error.message,
stack: error.stack,
workflowId: payload.workflowId,
provider: payload.provider,
})
// Complete logging session with error (matching workflow-execution pattern)
try {
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
},
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete logging session`, loggingError)
}
throw error // Let Trigger.dev handle retries
}
},
})

View File

@@ -1,6 +1,7 @@
import { task } from '@trigger.dev/sdk/v3'
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -43,6 +44,22 @@ export const workflowExecution = task({
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using workflows.'
)
}
// Load workflow data from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalizedData) {