improvement(async): improve error capturing for asynchronous workflow executions (#1808)

* improvement(async): improve error capturing for asynchronous workflow executions

* surface more erros

* added more logs

* fix failing tests

* ack DB comments
This commit is contained in:
Waleed
2025-11-04 18:16:00 -08:00
committed by GitHub
parent 670e63c108
commit f65d62ea3d
8 changed files with 466 additions and 210 deletions

View File

@@ -2,7 +2,9 @@ import { db } from '@sim/db'
import { chat, workflow, workspace } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { ChatFiles } from '@/lib/uploads'
import { generateRequestId } from '@/lib/utils'
import {
@@ -59,6 +61,29 @@ export async function POST(
if (!deployment.isActive) {
logger.warn(`[${requestId}] Chat is not active: ${identifier}`)
const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: '', // Will be resolved if needed
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: 'This chat is currently unavailable. The chat has been disabled.',
stackTrace: undefined,
},
traceSpans: [],
})
return addCorsHeaders(createErrorResponse('This chat is currently unavailable', 403), request)
}
@@ -96,6 +121,29 @@ export async function POST(
if (workflowResult.length === 0 || !workflowResult[0].isDeployed) {
logger.warn(`[${requestId}] Workflow not found or not deployed: ${deployment.workflowId}`)
const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: workflowResult[0]?.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: 'Chat workflow is not available. The workflow is not deployed.',
stackTrace: undefined,
},
traceSpans: [],
})
return addCorsHeaders(createErrorResponse('Chat workflow is not available', 503), request)
}
@@ -109,6 +157,29 @@ export async function POST(
if (workspaceData.length === 0) {
logger.error(`[${requestId}] Workspace not found for workflow ${deployment.workflowId}`)
const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: workflowResult[0].workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: 'Workspace not found. Critical configuration error - please contact support.',
stackTrace: undefined,
},
traceSpans: [],
})
return addCorsHeaders(createErrorResponse('Workspace not found', 500), request)
}
@@ -140,16 +211,43 @@ export async function POST(
executionId,
}
const uploadedFiles = await ChatFiles.processChatFiles(
files,
executionContext,
requestId,
deployment.userId
)
try {
const uploadedFiles = await ChatFiles.processChatFiles(
files,
executionContext,
requestId,
deployment.userId
)
if (uploadedFiles.length > 0) {
workflowInput.files = uploadedFiles
logger.info(`[${requestId}] Successfully processed ${uploadedFiles.length} files`)
if (uploadedFiles.length > 0) {
workflowInput.files = uploadedFiles
logger.info(`[${requestId}] Successfully processed ${uploadedFiles.length} files`)
}
} catch (fileError: any) {
logger.error(`[${requestId}] Failed to process chat files:`, fileError)
const fileLoggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
await fileLoggingSession.safeStart({
userId: workspaceOwnerId,
workspaceId: workflowResult[0].workspaceId || '',
variables: {},
})
await fileLoggingSession.safeCompleteWithError({
error: {
message: `File upload failed: ${fileError.message || 'Unable to process uploaded files'}`,
stackTrace: fileError.stack,
},
traceSpans: [],
})
throw fileError
}
}

View File

@@ -11,7 +11,13 @@ import {
mockTriggerDevSdk,
} from '@/app/api/__test-utils__/utils'
// Prefer mocking the background module to avoid loading Trigger.dev at all during tests
vi.mock('@trigger.dev/sdk', () => ({
tasks: {
trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }),
},
task: vi.fn().mockReturnValue({}),
}))
vi.mock('@/background/webhook-execution', () => ({
executeWebhookJob: vi.fn().mockResolvedValue({
success: true,
@@ -22,6 +28,10 @@ vi.mock('@/background/webhook-execution', () => ({
}),
}))
vi.mock('@/background/logs-webhook-delivery', () => ({
logsWebhookDelivery: {},
}))
const hasProcessedMessageMock = vi.fn().mockResolvedValue(false)
const markMessageAsProcessedMock = vi.fn().mockResolvedValue(true)
const closeRedisConnectionMock = vi.fn().mockResolvedValue(undefined)
@@ -78,27 +88,19 @@ vi.mock('@/executor', () => ({
})),
}))
// Set up environment before any imports
process.env.DATABASE_URL = 'postgresql://test:test@localhost:5432/test'
// Mock postgres dependencies
vi.mock('drizzle-orm/postgres-js', () => ({
drizzle: vi.fn().mockReturnValue({}),
}))
vi.mock('postgres', () => vi.fn().mockReturnValue({}))
// The @sim/db mock is handled in test utils via mockExecutionDependencies()
// (removed duplicate utils mock - defined above with specific handlers)
describe('Webhook Trigger API Route', () => {
beforeEach(() => {
// Ensure a fresh module graph so per-test vi.doMock() takes effect before imports
vi.resetModules()
vi.clearAllMocks()
// Clear global mock data
globalMockData.webhooks.length = 0
globalMockData.workflows.length = 0
globalMockData.schedules.length = 0
@@ -172,43 +174,22 @@ describe('Webhook Trigger API Route', () => {
vi.clearAllMocks()
})
// Removed: WhatsApp verification test has complex mock setup issues
/**
* Test POST webhook with workflow execution
* Verifies that a webhook trigger properly initiates workflow execution
*/
// TODO: Fix failing test - returns 500 instead of 200
// it('should trigger workflow execution via POST', async () => { ... })
/**
* Test 404 handling for non-existent webhooks
*/
it('should handle 404 for non-existent webhooks', async () => {
// The global @sim/db mock already returns empty arrays, so findWebhookAndWorkflow will return null
// Create a mock request
const req = createMockRequest('POST', { event: 'test' })
// Mock the path param
const params = Promise.resolve({ path: 'non-existent-path' })
// Import the handler
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
// Call the handler
const response = await POST(req, { params })
// Check response - expect 404 since our implementation returns 404 when webhook is not found
expect(response.status).toBe(404)
// Parse the response body
const text = await response.text()
expect(text).toMatch(/not found/i) // Response should contain "not found" message
expect(text).toMatch(/not found/i)
})
describe('Generic Webhook Authentication', () => {
// Mock billing and rate limiting dependencies
beforeEach(() => {
vi.doMock('@/lib/billing/core/subscription', () => ({
getHighestPrioritySubscription: vi.fn().mockResolvedValue({
@@ -222,11 +203,7 @@ describe('Webhook Trigger API Route', () => {
}))
})
/**
* Test generic webhook without authentication (default behavior)
*/
it('should process generic webhook without authentication', async () => {
// Configure mock data
globalMockData.webhooks.push({
id: 'generic-webhook-id',
provider: 'generic',
@@ -249,7 +226,6 @@ describe('Webhook Trigger API Route', () => {
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
const response = await POST(req, { params })
// Should succeed (200 OK with webhook processed message)
expect(response.status).toBe(200)
const data = await response.json()
@@ -257,10 +233,9 @@ describe('Webhook Trigger API Route', () => {
})
/**
* Test generic webhook with Bearer token authentication (no custom header)
* Test generic webhook with Bearer token authentication
*/
it('should authenticate with Bearer token when no custom header is configured', async () => {
// Configure mock data with Bearer token
globalMockData.webhooks.push({
id: 'generic-webhook-id',
provider: 'generic',
@@ -288,9 +263,6 @@ describe('Webhook Trigger API Route', () => {
expect(response.status).toBe(200)
})
/**
* Test generic webhook with custom header authentication
*/
it('should authenticate with custom header when configured', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -323,9 +295,6 @@ describe('Webhook Trigger API Route', () => {
expect(response.status).toBe(200)
})
/**
* Test case insensitive Bearer token authentication
*/
it('should handle case insensitive Bearer token authentication', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -369,9 +338,6 @@ describe('Webhook Trigger API Route', () => {
}
})
/**
* Test case insensitive custom header authentication
*/
it('should handle case insensitive custom header authentication', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -414,9 +380,6 @@ describe('Webhook Trigger API Route', () => {
}
})
/**
* Test rejection of wrong Bearer token
*/
it('should reject wrong Bearer token', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -442,9 +405,6 @@ describe('Webhook Trigger API Route', () => {
expect(processWebhookMock).not.toHaveBeenCalled()
})
/**
* Test rejection of wrong custom header token
*/
it('should reject wrong custom header token', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -474,9 +434,6 @@ describe('Webhook Trigger API Route', () => {
expect(processWebhookMock).not.toHaveBeenCalled()
})
/**
* Test rejection of missing authentication
*/
it('should reject missing authentication when required', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -498,9 +455,6 @@ describe('Webhook Trigger API Route', () => {
expect(processWebhookMock).not.toHaveBeenCalled()
})
/**
* Test exclusivity - Bearer token should be rejected when custom header is configured
*/
it('should reject Bearer token when custom header is configured', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -530,9 +484,6 @@ describe('Webhook Trigger API Route', () => {
expect(processWebhookMock).not.toHaveBeenCalled()
})
/**
* Test wrong custom header name is rejected
*/
it('should reject wrong custom header name', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',
@@ -562,9 +513,6 @@ describe('Webhook Trigger API Route', () => {
expect(processWebhookMock).not.toHaveBeenCalled()
})
/**
* Test authentication required but no token configured
*/
it('should reject when auth is required but no token is configured', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',

View File

@@ -1,5 +1,7 @@
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { generateRequestId } from '@/lib/utils'
import {
checkRateLimits,
@@ -110,6 +112,30 @@ export async function POST(
logger.warn(
`[${requestId}] Trigger block ${foundWebhook.blockId} not found in deployment for workflow ${foundWorkflow.id}`
)
const executionId = uuidv4()
const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId)
const actorUserId = foundWorkflow.workspaceId
? (await import('@/lib/workspaces/utils')).getWorkspaceBilledAccountUserId(
foundWorkflow.workspaceId
) || foundWorkflow.userId
: foundWorkflow.userId
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: foundWorkflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Trigger block not deployed. The webhook trigger (block ${foundWebhook.blockId}) is not present in the deployed workflow. Please redeploy the workflow.`,
stackTrace: undefined,
},
traceSpans: [],
})
return new NextResponse('Trigger block not deployed', { status: 404 })
}
}

View File

@@ -1,6 +1,5 @@
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { createLogger } from '@/lib/logs/console/logger'
@@ -8,12 +7,10 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { generateRequestId, SSE_HEADERS } from '@/lib/utils'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { StreamingExecution } from '@/executor/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
import { validateWorkflowAccess } from '../../middleware'
const EnvVarsSchema = z.record(z.string())
const logger = createLogger('WorkflowExecuteAPI')
@@ -30,7 +27,20 @@ class UsageLimitError extends Error {
/**
* Execute workflow with streaming support - used by chat and other streaming endpoints
* Returns ExecutionResult instead of NextResponse
*
* This is a wrapper function that:
* - Checks usage limits before execution (protects chat and streaming paths)
* - Logs usage limit errors to the database for user visibility
* - Supports streaming callbacks (onStream, onBlockComplete)
* - Returns ExecutionResult instead of NextResponse
*
* Used by:
* - Chat execution (/api/chat/[identifier]/route.ts)
* - Streaming responses (lib/workflows/streaming.ts)
*
* Note: The POST handler in this file calls executeWorkflowCore() directly and has
* its own usage check. This wrapper provides convenience and built-in protection
* for callers that need streaming support.
*/
export async function executeWorkflow(
workflow: any,
@@ -54,6 +64,38 @@ export async function executeWorkflow(
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
}
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
stackTrace: undefined,
},
traceSpans: [],
})
throw new UsageLimitError(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.'
)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
@@ -88,7 +130,6 @@ export async function executeWorkflow(
})
if (streamConfig?.skipLoggingComplete) {
// Add streaming metadata for later completion
return {
...result,
_streamingMetadata: {
@@ -146,7 +187,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
const workflow = workflowValidation.workflow!
// Parse request body (handle empty body for curl requests)
let body: any = {}
try {
const text = await req.text()
@@ -173,15 +213,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const streamHeader = req.headers.get('X-Stream-Response') === 'true'
const enableSSE = streamHeader || streamParam === true
// Check usage limits
const usageCheck = await checkServerSideUsageLimits(userId)
if (usageCheck.isExceeded) {
return NextResponse.json(
{ error: usageCheck.message || 'Usage limit exceeded' },
{ status: 402 }
)
}
logger.info(`[${requestId}] Starting server-side execution`, {
workflowId,
userId,
@@ -193,9 +224,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
enableSSE,
})
// Generate execution ID
const executionId = uuidv4()
// Map client trigger type to logging trigger type (excluding 'api-endpoint')
type LoggingTriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
let loggingTriggerType: LoggingTriggerType = 'manual'
if (
@@ -214,7 +243,42 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId
)
// NON-SSE PATH: Direct JSON execution for API calls, background jobs
// Check usage limits for this POST handler execution path
// Architecture note: This handler calls executeWorkflowCore() directly (both SSE and non-SSE paths).
// The executeWorkflow() wrapper function (used by chat) has its own check (line 54).
const usageCheck = await checkServerSideUsageLimits(userId)
if (usageCheck.isExceeded) {
logger.warn(`[${requestId}] User ${userId} has exceeded usage limits. Blocking execution.`, {
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
})
await loggingSession.safeStart({
userId,
workspaceId: workflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
stackTrace: undefined,
},
traceSpans: [],
})
return NextResponse.json(
{
error:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
},
{ status: 402 }
)
}
if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
try {
@@ -244,7 +308,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
})
// Filter out logs and internal metadata for API responses
const filteredResult = {
success: result.success,
output: result.output,
@@ -262,7 +325,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} catch (error: any) {
logger.error(`[${requestId}] Non-SSE execution failed:`, error)
// Extract execution result from error if available
const executionResult = error.executionResult
return NextResponse.json(
@@ -283,7 +345,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
}
// SSE PATH: Stream execution events for client builder UI
logger.info(`[${requestId}] Using SSE execution (streaming response)`)
const encoder = new TextEncoder()
let executorInstance: any = null
@@ -301,7 +362,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
})
controller.enqueue(encodeSSEEvent(event))
} catch {
// Stream closed - stop sending events
isStreamClosed = true
}
}
@@ -309,7 +369,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
try {
const startTime = new Date()
// Send execution started event
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
@@ -320,7 +379,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
// SSE Callbacks
const onBlockStart = async (
blockId: string,
blockName: string,
@@ -361,7 +419,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationType: SubflowType
}
) => {
// Check if this is an error completion
const hasError = callbackData.output?.error
if (hasError) {
@@ -487,7 +544,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
})
// Check if execution was cancelled
if (result.error === 'Workflow execution was cancelled') {
logger.info(`[${requestId}] Workflow execution was cancelled`)
sendEvent({
@@ -499,10 +555,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
duration: result.metadata?.duration || 0,
},
})
return // Exit early
return
}
// Send execution completed event
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
@@ -519,10 +574,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} catch (error: any) {
logger.error(`[${requestId}] SSE execution failed:`, error)
// Extract execution result from error if available
const executionResult = error.executionResult
// Send error event
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
@@ -534,7 +587,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
// Close the stream if not already closed
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
@@ -549,14 +601,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)
// Cancel the executor if it exists
if (executorInstance && typeof executorInstance.cancel === 'function') {
executorInstance.cancel()
}
},
})
// Return SSE response
return new NextResponse(stream, {
headers: {
...SSE_HEADERS,

View File

@@ -48,11 +48,9 @@ function calculateNextRunTime(
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
// Get timezone from schedule configuration (default to UTC)
const timezone = scheduleValues.timezone || 'UTC'
if (schedule.cronExpression) {
// Use Croner with timezone support for accurate scheduling
const cron = new Cron(schedule.cronExpression, {
timezone,
})
@@ -87,6 +85,29 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
await loggingSession.safeStart({
userId: 'unknown',
workspaceId: '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
'Workflow not found. The scheduled workflow may have been deleted or is no longer accessible.',
stackTrace: undefined,
},
traceSpans: [],
})
return
}
@@ -104,11 +125,41 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
logger.warn(
`[${requestId}] Skipping schedule ${payload.scheduleId}: unable to resolve billed account.`
)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
await loggingSession.safeStart({
userId: workflowRecord.userId ?? 'unknown',
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
'Unable to resolve billing account. This workflow cannot execute scheduled runs without a valid billing account.',
stackTrace: undefined,
},
traceSpans: [],
})
return
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
@@ -127,6 +178,20 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
}
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'}. Schedule will retry in 5 minutes.`,
stackTrace: undefined,
},
traceSpans: [],
})
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
@@ -157,6 +222,23 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
workflowId: payload.workflowId,
}
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using scheduled workflows.',
stackTrace: undefined,
},
traceSpans: [],
})
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
@@ -175,13 +257,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
try {
const executionSuccess = await (async () => {
try {
@@ -200,6 +275,21 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
logger.warn(
`[${requestId}] Schedule trigger block ${payload.blockId} not found in deployed workflow ${payload.workflowId}. Skipping execution.`
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Trigger block not deployed. The schedule trigger (block ${payload.blockId}) is not present in the deployed workflow. Please redeploy the workflow.`,
stackTrace: undefined,
},
traceSpans: [],
})
return { skip: true, blocks: {} as Record<string, BlockState> }
}
}
@@ -375,22 +465,16 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
)
try {
await loggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
message: `Schedule execution failed: ${earlyError.message}`,
stackTrace: earlyError.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for early schedule failure`,
`[${requestId}] Failed to complete log entry for schedule failure`,
loggingError
)
}
@@ -485,34 +569,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
error
)
try {
const failureLoggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
await failureLoggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await failureLoggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${error.message}`,
stackTrace: error.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for failed schedule execution`,
loggingError
)
}
let nextRunAt: Date
try {
const [workflowRecord] = await db

View File

@@ -3,7 +3,6 @@ import { webhook, workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { processExecutionFiles } from '@/lib/execution/files'
import { IdempotencyService, webhookIdempotency } from '@/lib/idempotency'
@@ -133,29 +132,24 @@ async function executeWebhookJobInternal(
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
try {
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.'
)
}
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // Will be resolved below
variables: {},
triggerData: {
isTest: payload.testMode === true,
executionTarget: payload.executionTarget || 'deployed',
},
})
// Load workflow state based on execution target
const workflowData =
payload.executionTarget === 'live'
? await loadWorkflowFromNormalizedTables(payload.workflowId)
: await loadDeployedWorkflowState(payload.workflowId)
if (!workflowData) {
throw new Error(`Workflow ${payload.workflowId} has no live normalized state`)
throw new Error(
`Workflow state not found. The workflow may not be ${payload.executionTarget === 'live' ? 'saved' : 'deployed'} or the deployment data may be corrupted.`
)
}
const { blocks, edges, loops, parallels } = workflowData
@@ -181,17 +175,6 @@ async function executeWebhookJobInternal(
)
const decryptedEnvVars: Record<string, string> = Object.fromEntries(decryptedPairs)
// Start logging session
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
variables: decryptedEnvVars,
triggerData: {
isTest: payload.testMode === true,
executionTarget: payload.executionTarget || 'deployed',
},
})
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
@@ -499,7 +482,6 @@ async function executeWebhookJobInternal(
provider: payload.provider,
})
// Complete logging session with error (matching workflow-execution pattern)
try {
const executionResult = (error?.executionResult as ExecutionResult | undefined) || {
success: false,

View File

@@ -3,6 +3,7 @@ import { workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
@@ -30,18 +31,10 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executionId,
})
// Initialize logging session
const triggerType = payload.triggerType || 'api'
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
// Load workflow from database
const workflow = await getWorkflowById(workflowId)
if (!workflow) {
throw new Error(`Workflow ${workflowId} not found`)
}
// Get workspace ID for the workflow
const wfRows = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
@@ -49,6 +42,57 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
.limit(1)
const workspaceId = wfRows[0]?.workspaceId || undefined
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
}
)
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
stackTrace: undefined,
},
traceSpans: [],
})
throw new Error(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.'
)
}
const workflow = await getWorkflowById(workflowId)
if (!workflow) {
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
'Workflow not found. The workflow may have been deleted or is no longer accessible.',
stackTrace: undefined,
},
traceSpans: [],
})
throw new Error(`Workflow ${workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
@@ -98,7 +142,6 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
}
}
// Trigger.dev task definition
export const workflowExecutionTask = task({
id: 'workflow-execution',
run: executeWorkflowJob,

View File

@@ -2,10 +2,12 @@ import { db, webhook, workflow } from '@sim/db'
import { tasks } from '@trigger.dev/sdk'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { env, isTruthy } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import {
handleSlackChallenge,
handleWhatsAppVerification,
@@ -287,7 +289,7 @@ export async function checkRateLimits(
if (!actorUserId) {
logger.warn(`[${requestId}] Webhook requires a workspace billing account to attribute usage`)
return NextResponse.json({ message: 'Workspace billing account required' }, { status: 200 })
return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 })
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
@@ -307,14 +309,37 @@ export async function checkRateLimits(
resetAt: rateLimitCheck.resetAt,
})
const executionId = uuidv4()
const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: foundWorkflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'}. Please try again later.`,
stackTrace: undefined,
},
traceSpans: [],
})
if (foundWebhook.provider === 'microsoftteams') {
return NextResponse.json({
type: 'message',
text: 'Rate limit exceeded. Please try again later.',
})
return NextResponse.json(
{
type: 'message',
text: 'Rate limit exceeded. Please try again later.',
},
{ status: 429 }
)
}
return NextResponse.json({ message: 'Rate limit exceeded' }, { status: 200 })
return NextResponse.json(
{ error: 'Rate limit exceeded. Please try again later.' },
{ status: 429 }
)
}
logger.debug(`[${requestId}] Rate limit check passed for webhook`, {
@@ -345,7 +370,7 @@ export async function checkUsageLimits(
if (!actorUserId) {
logger.warn(`[${requestId}] Webhook requires a workspace billing account to attribute usage`)
return NextResponse.json({ message: 'Workspace billing account required' }, { status: 200 })
return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 })
}
const usageCheck = await checkServerSideUsageLimits(actorUserId)
@@ -360,14 +385,39 @@ export async function checkUsageLimits(
}
)
const executionId = uuidv4()
const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: foundWorkflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.',
stackTrace: undefined,
},
traceSpans: [],
})
if (foundWebhook.provider === 'microsoftteams') {
return NextResponse.json({
type: 'message',
text: 'Usage limit exceeded. Please upgrade your plan to continue.',
})
return NextResponse.json(
{
type: 'message',
text: 'Usage limit exceeded. Please upgrade your plan to continue.',
},
{ status: 402 }
)
}
return NextResponse.json({ message: 'Usage limit exceeded' }, { status: 200 })
return NextResponse.json(
{ error: usageCheck.message || 'Usage limit exceeded' },
{ status: 402 }
)
}
logger.debug(`[${requestId}] Usage limit check passed for webhook`, {
@@ -395,7 +445,7 @@ export async function queueWebhookExecution(
logger.warn(
`[${options.requestId}] Webhook requires a workspace billing account to attribute usage`
)
return NextResponse.json({ message: 'Workspace billing account required' }, { status: 200 })
return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 })
}
const headers = Object.fromEntries(request.headers.entries())
@@ -475,12 +525,15 @@ export async function queueWebhookExecution(
logger.error(`[${options.requestId}] Failed to queue webhook execution:`, error)
if (foundWebhook.provider === 'microsoftteams') {
return NextResponse.json({
type: 'message',
text: 'Webhook processing failed',
})
return NextResponse.json(
{
type: 'message',
text: 'Webhook processing failed',
},
{ status: 500 }
)
}
return NextResponse.json({ message: 'Internal server error' }, { status: 200 })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}