fix(logging): add preprocessing util shared by all execution paths (#2081)

* fix(logging): add preprocessing util shared by all execution paths

* DRY
This commit is contained in:
Waleed
2025-11-20 14:05:14 -08:00
committed by GitHub
parent 4a0450d1fc
commit 75f55c894a
18 changed files with 1239 additions and 805 deletions

View File

@@ -6,6 +6,36 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createMockRequest } from '@/app/api/__test-utils__/utils'
vi.mock('@/lib/execution/preprocessing', () => ({
preprocessExecution: vi.fn().mockResolvedValue({
success: true,
actorUserId: 'test-user-id',
workflowRecord: {
id: 'test-workflow-id',
userId: 'test-user-id',
isDeployed: true,
workspaceId: 'test-workspace-id',
variables: {},
},
userSubscription: {
plan: 'pro',
status: 'active',
},
rateLimitInfo: {
allowed: true,
remaining: 100,
resetAt: new Date(),
},
}),
}))
vi.mock('@/lib/logs/execution/logging-session', () => ({
LoggingSession: vi.fn().mockImplementation(() => ({
safeStart: vi.fn().mockResolvedValue(undefined),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
})),
}))
describe('Chat Identifier API Route', () => {
const createMockStream = () => {
return new ReadableStream({
@@ -307,48 +337,16 @@ describe('Chat Identifier API Route', () => {
})
it('should return 503 when workflow is not available', async () => {
// Override the default workflow result to return non-deployed
vi.doMock('@sim/db', () => {
// Track call count to return different results
let callCount = 0
const { preprocessExecution } = await import('@/lib/execution/preprocessing')
const originalImplementation = vi.mocked(preprocessExecution).getMockImplementation()
const mockLimit = vi.fn().mockImplementation(() => {
callCount++
if (callCount === 1) {
// First call - chat query
return [
{
id: 'chat-id',
workflowId: 'unavailable-workflow',
userId: 'user-id',
isActive: true,
authType: 'public',
outputConfigs: [{ blockId: 'block-1', path: 'output' }],
},
]
}
if (callCount === 2) {
// Second call - workflow query
return [
{
isDeployed: false,
},
]
}
return []
})
const mockWhere = vi.fn().mockReturnValue({ limit: mockLimit })
const mockFrom = vi.fn().mockReturnValue({ where: mockWhere })
const mockSelect = vi.fn().mockReturnValue({ from: mockFrom })
return {
db: {
select: mockSelect,
},
chat: {},
workflow: {},
}
vi.mocked(preprocessExecution).mockResolvedValueOnce({
success: false,
error: {
message: 'Workflow is not deployed',
statusCode: 403,
logCreated: true,
},
})
const req = createMockRequest('POST', { input: 'Hello' })
@@ -358,11 +356,15 @@ describe('Chat Identifier API Route', () => {
const response = await POST(req, { params })
expect(response.status).toBe(503)
expect(response.status).toBe(403)
const data = await response.json()
expect(data).toHaveProperty('error')
expect(data).toHaveProperty('message', 'Chat workflow is not available')
expect(data).toHaveProperty('message', 'Workflow is not deployed')
if (originalImplementation) {
vi.mocked(preprocessExecution).mockImplementation(originalImplementation)
}
})
it('should return streaming response for valid chat messages', async () => {
@@ -378,7 +380,6 @@ describe('Chat Identifier API Route', () => {
expect(response.headers.get('Cache-Control')).toBe('no-cache')
expect(response.headers.get('Connection')).toBe('keep-alive')
// Verify createStreamingResponse was called with correct workflow info
expect(mockCreateStreamingResponse).toHaveBeenCalledWith(
expect.objectContaining({
workflow: expect.objectContaining({
@@ -408,7 +409,6 @@ describe('Chat Identifier API Route', () => {
expect(response.status).toBe(200)
expect(response.body).toBeInstanceOf(ReadableStream)
// Test that we can read from the response stream
if (response.body) {
const reader = response.body.getReader()
const { value, done } = await reader.read()
@@ -447,7 +447,6 @@ describe('Chat Identifier API Route', () => {
})
it('should handle invalid JSON in request body', async () => {
// Create a request with invalid JSON
const req = {
method: 'POST',
headers: new Headers(),

View File

@@ -1,9 +1,10 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { chat, workflow, workspace } from '@sim/db/schema'
import { chat } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { ChatFiles } from '@/lib/uploads'
@@ -93,7 +94,7 @@ export async function POST(
if (!deployment.isActive) {
logger.warn(`[${requestId}] Chat is not active: ${identifier}`)
const executionId = uuidv4()
const executionId = randomUUID()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
@@ -140,82 +141,35 @@ export async function POST(
return addCorsHeaders(createErrorResponse('No input provided', 400), request)
}
const workflowResult = await db
.select({
isDeployed: workflow.isDeployed,
workspaceId: workflow.workspaceId,
variables: workflow.variables,
})
.from(workflow)
.where(eq(workflow.id, deployment.workflowId))
.limit(1)
const executionId = randomUUID()
if (workflowResult.length === 0 || !workflowResult[0].isDeployed) {
logger.warn(`[${requestId}] Workflow not found or not deployed: ${deployment.workflowId}`)
const loggingSession = new LoggingSession(deployment.workflowId, executionId, 'chat', requestId)
const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
const preprocessResult = await preprocessExecution({
workflowId: deployment.workflowId,
userId: deployment.userId,
triggerType: 'chat',
executionId,
requestId,
checkRateLimit: false, // Chat bypasses rate limits
checkDeployment: true, // Chat requires deployed workflows
loggingSession,
})
if (!preprocessResult.success) {
logger.warn(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`)
return addCorsHeaders(
createErrorResponse(
preprocessResult.error?.message || 'Failed to process request',
preprocessResult.error?.statusCode || 500
),
request
)
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: workflowResult[0]?.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: 'Chat workflow is not available. The workflow is not deployed.',
stackTrace: undefined,
},
traceSpans: [],
})
return addCorsHeaders(createErrorResponse('Chat workflow is not available', 503), request)
}
let workspaceOwnerId = deployment.userId
if (workflowResult[0].workspaceId) {
const workspaceData = await db
.select({ ownerId: workspace.ownerId })
.from(workspace)
.where(eq(workspace.id, workflowResult[0].workspaceId))
.limit(1)
if (workspaceData.length === 0) {
logger.error(`[${requestId}] Workspace not found for workflow ${deployment.workflowId}`)
const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: workflowResult[0].workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: 'Workspace not found. Critical configuration error - please contact support.',
stackTrace: undefined,
},
traceSpans: [],
})
return addCorsHeaders(createErrorResponse('Workspace not found', 500), request)
}
workspaceOwnerId = workspaceData[0].ownerId
}
const { actorUserId, workflowRecord } = preprocessResult
const workspaceOwnerId = actorUserId!
const workspaceId = workflowRecord?.workspaceId || ''
try {
const selectedOutputs: string[] = []
@@ -232,12 +186,10 @@ export async function POST(
const { SSE_HEADERS } = await import('@/lib/utils')
const { createFilteredResult } = await import('@/app/api/workflows/[id]/execute/route')
const executionId = crypto.randomUUID()
const workflowInput: any = { input, conversationId }
if (files && Array.isArray(files) && files.length > 0) {
const executionContext = {
workspaceId: workflowResult[0].workspaceId || '',
workspaceId,
workflowId: deployment.workflowId,
executionId,
}
@@ -257,20 +209,13 @@ export async function POST(
} catch (fileError: any) {
logger.error(`[${requestId}] Failed to process chat files:`, fileError)
const fileLoggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
await fileLoggingSession.safeStart({
await loggingSession.safeStart({
userId: workspaceOwnerId,
workspaceId: workflowResult[0].workspaceId || '',
workspaceId,
variables: {},
})
await fileLoggingSession.safeCompleteWithError({
await loggingSession.safeCompleteWithError({
error: {
message: `File upload failed: ${fileError.message || 'Unable to process uploaded files'}`,
stackTrace: fileError.stack,
@@ -285,9 +230,9 @@ export async function POST(
const workflowForExecution = {
id: deployment.workflowId,
userId: deployment.userId,
workspaceId: workflowResult[0].workspaceId,
isDeployed: true,
variables: workflowResult[0].variables || {},
workspaceId,
isDeployed: workflowRecord?.isDeployed ?? false,
variables: workflowRecord?.variables || {},
}
const stream = await createStreamingResponse({

View File

@@ -71,13 +71,13 @@ describe('Chat API Utils', () => {
})
describe('Auth token utils', () => {
it('should encrypt and validate auth tokens', async () => {
const { encryptAuthToken, validateAuthToken } = await import('@/app/api/chat/utils')
it('should validate auth tokens', async () => {
const { validateAuthToken } = await import('@/app/api/chat/utils')
const chatId = 'test-chat-id'
const type = 'password'
const token = encryptAuthToken(chatId, type)
const token = Buffer.from(`${chatId}:${type}:${Date.now()}`).toString('base64')
expect(typeof token).toBe('string')
expect(token.length).toBeGreaterThan(0)
@@ -92,7 +92,6 @@ describe('Chat API Utils', () => {
const { validateAuthToken } = await import('@/app/api/chat/utils')
const chatId = 'test-chat-id'
// Create an expired token by directly constructing it with an old timestamp
const expiredToken = Buffer.from(
`${chatId}:password:${Date.now() - 25 * 60 * 60 * 1000}`
).toString('base64')
@@ -166,20 +165,6 @@ describe('Chat API Utils', () => {
'Content-Type, X-Requested-With'
)
})
it('should handle OPTIONS request', async () => {
const { OPTIONS } = await import('@/app/api/chat/utils')
const mockRequest = {
headers: {
get: vi.fn().mockReturnValue('http://localhost:3000'),
},
} as any
const response = await OPTIONS(mockRequest)
expect(response.status).toBe(204)
})
})
describe('Chat auth validation', () => {
@@ -355,10 +340,8 @@ describe('Chat API Utils', () => {
describe('Execution Result Processing', () => {
it('should process logs regardless of overall success status', () => {
// Test that logs are processed even when overall execution fails
// This is key for partial success scenarios
const executionResult = {
success: false, // Overall execution failed
success: false,
output: {},
logs: [
{
@@ -383,16 +366,13 @@ describe('Chat API Utils', () => {
metadata: { duration: 1000 },
}
// Test the key logic: logs should be processed regardless of overall success
expect(executionResult.success).toBe(false)
expect(executionResult.logs).toBeDefined()
expect(executionResult.logs).toHaveLength(2)
// First log should be successful
expect(executionResult.logs[0].success).toBe(true)
expect(executionResult.logs[0].output?.content).toBe('Agent 1 succeeded')
// Second log should be failed
expect(executionResult.logs[1].success).toBe(false)
expect(executionResult.logs[1].error).toBe('Agent 2 failed')
})
@@ -405,18 +385,15 @@ describe('Chat API Utils', () => {
metadata: { duration: 100 },
}
// Test direct ExecutionResult
const directResult = executionResult
const extractedDirect = directResult
expect(extractedDirect).toBe(executionResult)
// Test StreamingExecution with embedded ExecutionResult
const streamingResult = {
stream: new ReadableStream(),
execution: executionResult,
}
// Test that streaming execution wraps the result correctly
const extractedFromStreaming =
streamingResult && typeof streamingResult === 'object' && 'execution' in streamingResult
? streamingResult.execution

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { chat, workflow } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import type { NextRequest, NextResponse } from 'next/server'
import { isDev } from '@/lib/environment'
import { createLogger } from '@/lib/logs/console/logger'
import { hasAdminPermission } from '@/lib/permissions/utils'
@@ -77,7 +77,7 @@ export async function checkChatAccess(
return { hasAccess: false }
}
export const encryptAuthToken = (chatId: string, type: string): string => {
const encryptAuthToken = (chatId: string, type: string): string => {
return Buffer.from(`${chatId}:${type}:${Date.now()}`).toString('base64')
}
@@ -104,7 +104,6 @@ export const validateAuthToken = (token: string, chatId: string): boolean => {
}
}
// Set cookie helper function
export const setChatAuthCookie = (response: NextResponse, chatId: string, type: string): void => {
const token = encryptAuthToken(chatId, type)
response.cookies.set({
@@ -118,7 +117,6 @@ export const setChatAuthCookie = (response: NextResponse, chatId: string, type:
})
}
// Helper function to add CORS headers to responses
export function addCorsHeaders(response: NextResponse, request: NextRequest) {
const origin = request.headers.get('origin') || ''
@@ -132,12 +130,6 @@ export function addCorsHeaders(response: NextResponse, request: NextRequest) {
return response
}
export async function OPTIONS(request: NextRequest) {
const response = new NextResponse(null, { status: 204 })
return addCorsHeaders(response, request)
}
// Validate authentication for chat access
export async function validateChatAuth(
requestId: string,
deployment: any,
@@ -146,12 +138,10 @@ export async function validateChatAuth(
): Promise<{ authorized: boolean; error?: string }> {
const authType = deployment.authType || 'public'
// Public chats are accessible to everyone
if (authType === 'public') {
return { authorized: true }
}
// Check for auth cookie first
const cookieName = `chat_auth_${deployment.id}`
const authCookie = request.cookies.get(cookieName)
@@ -159,9 +149,7 @@ export async function validateChatAuth(
return { authorized: true }
}
// For password protection, check the password in the request body
if (authType === 'password') {
// For GET requests, we just notify the client that authentication is required
if (request.method === 'GET') {
return { authorized: false, error: 'auth_required_password' }
}
@@ -198,22 +186,18 @@ export async function validateChatAuth(
}
}
// For email access control, check the email in the request body
if (authType === 'email') {
// For GET requests, we just notify the client that authentication is required
if (request.method === 'GET') {
return { authorized: false, error: 'auth_required_email' }
}
try {
// Use the parsed body if provided, otherwise the auth check is not applicable
if (!parsedBody) {
return { authorized: false, error: 'Email is required' }
}
const { email, input } = parsedBody
// If this is a chat message, not an auth attempt
if (input && !email) {
return { authorized: false, error: 'auth_required_email' }
}
@@ -224,17 +208,12 @@ export async function validateChatAuth(
const allowedEmails = deployment.allowedEmails || []
// Check exact email matches
if (allowedEmails.includes(email)) {
// Email is allowed but still needs OTP verification
// Return a special error code that the client will recognize
return { authorized: false, error: 'otp_required' }
}
// Check domain matches (prefixed with @)
const domain = email.split('@')[1]
if (domain && allowedEmails.some((allowed: string) => allowed === `@${domain}`)) {
// Domain is allowed but still needs OTP verification
return { authorized: false, error: 'otp_required' }
}
@@ -257,6 +236,10 @@ export async function validateChatAuth(
const { email, input, checkSSOAccess } = parsedBody
if (input && !checkSSOAccess) {
return { authorized: false, error: 'auth_required_sso' }
}
if (checkSSOAccess) {
if (!email) {
return { authorized: false, error: 'Email is required' }

View File

@@ -1,5 +1,8 @@
import { randomUUID } from 'crypto'
import { type NextRequest, NextResponse } from 'next/server'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
@@ -35,6 +38,54 @@ export async function POST(
const resumeInput = payload?.input ?? payload ?? {}
const userId = workflow.userId ?? ''
const resumeExecutionId = randomUUID()
const requestId = generateRequestId()
logger.info(`[${requestId}] Preprocessing resume execution`, {
workflowId,
parentExecutionId: executionId,
resumeExecutionId,
userId,
})
const preprocessResult = await preprocessExecution({
workflowId,
userId,
triggerType: 'manual', // Resume is a manual trigger
executionId: resumeExecutionId,
requestId,
checkRateLimit: false, // Manual triggers bypass rate limits
checkDeployment: false, // Resuming existing execution, deployment already checked
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
workspaceId: workflow.workspaceId || undefined,
isResumeContext: true, // Enable billing fallback for paused workflow resumes
})
if (!preprocessResult.success) {
logger.warn(`[${requestId}] Preprocessing failed for resume`, {
workflowId,
parentExecutionId: executionId,
error: preprocessResult.error?.message,
statusCode: preprocessResult.error?.statusCode,
})
return NextResponse.json(
{
error:
preprocessResult.error?.message ||
'Failed to validate resume execution. Please try again.',
},
{ status: preprocessResult.error?.statusCode || 400 }
)
}
logger.info(`[${requestId}] Preprocessing passed, proceeding with resume`, {
workflowId,
parentExecutionId: executionId,
resumeExecutionId,
actorUserId: preprocessResult.actorUserId,
})
try {
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
executionId,

View File

@@ -2,7 +2,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'
import {
checkRateLimits,
checkWebhookPreprocessing,
findWebhookAndWorkflow,
handleProviderChallenges,
parseWebhookBody,
@@ -67,9 +67,39 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return authError
}
const rateLimitError = await checkRateLimits(foundWorkflow, foundWebhook, requestId)
if (rateLimitError) {
return rateLimitError
let preprocessError: NextResponse | null = null
try {
preprocessError = await checkWebhookPreprocessing(
foundWorkflow,
foundWebhook,
requestId,
true // testMode - skips usage limits
)
if (preprocessError) {
return preprocessError
}
} catch (error) {
logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
})
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{
type: 'message',
text: 'An unexpected error occurred during preprocessing',
},
{ status: 500 }
)
}
return NextResponse.json(
{ error: 'An unexpected error occurred during preprocessing' },
{ status: 500 }
)
}
logger.info(

View File

@@ -88,6 +88,35 @@ vi.mock('@/executor', () => ({
})),
}))
vi.mock('@/lib/execution/preprocessing', () => ({
preprocessExecution: vi.fn().mockResolvedValue({
success: true,
actorUserId: 'test-user-id',
workflowRecord: {
id: 'test-workflow-id',
userId: 'test-user-id',
isDeployed: true,
workspaceId: 'test-workspace-id',
},
userSubscription: {
plan: 'pro',
status: 'active',
},
rateLimitInfo: {
allowed: true,
remaining: 100,
resetAt: new Date(),
},
}),
}))
vi.mock('@/lib/logs/execution/logging-session', () => ({
LoggingSession: vi.fn().mockImplementation(() => ({
safeStart: vi.fn().mockResolvedValue(undefined),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
})),
}))
process.env.DATABASE_URL = 'postgresql://test:test@localhost:5432/test'
vi.mock('drizzle-orm/postgres-js', () => ({
@@ -190,19 +219,6 @@ describe('Webhook Trigger API Route', () => {
})
describe('Generic Webhook Authentication', () => {
beforeEach(() => {
vi.doMock('@/lib/billing/core/subscription', () => ({
getHighestPrioritySubscription: vi.fn().mockResolvedValue({
plan: 'pro',
status: 'active',
}),
}))
vi.doMock('@/lib/billing', () => ({
checkServerSideUsageLimits: vi.fn().mockResolvedValue(null),
}))
})
it('should process generic webhook without authentication', async () => {
globalMockData.webhooks.push({
id: 'generic-webhook-id',

View File

@@ -2,8 +2,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'
import {
checkRateLimits,
checkUsageLimits,
checkWebhookPreprocessing,
findWebhookAndWorkflow,
handleProviderChallenges,
parseWebhookBody,
@@ -124,14 +123,39 @@ export async function POST(
return authError
}
const rateLimitError = await checkRateLimits(foundWorkflow, foundWebhook, requestId)
if (rateLimitError) {
return rateLimitError
}
let preprocessError: NextResponse | null = null
try {
preprocessError = await checkWebhookPreprocessing(
foundWorkflow,
foundWebhook,
requestId,
false // testMode
)
if (preprocessError) {
return preprocessError
}
} catch (error) {
logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
})
const usageLimitError = await checkUsageLimits(foundWorkflow, foundWebhook, requestId, false)
if (usageLimitError) {
return usageLimitError
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{
type: 'message',
text: 'An unexpected error occurred during preprocessing',
},
{ status: 500 }
)
}
return NextResponse.json(
{ error: 'An unexpected error occurred during preprocessing' },
{ status: 500 }
)
}
if (foundWebhook.blockId) {

View File

@@ -2,8 +2,8 @@ import { type NextRequest, NextResponse } from 'next/server'
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { generateRequestId, SSE_HEADERS } from '@/lib/utils'
@@ -16,7 +16,6 @@ import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/ex
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { createStreamingResponse } from '@/lib/workflows/streaming'
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
@@ -30,7 +29,6 @@ const ExecuteWorkflowSchema = z.object({
stream: z.boolean().optional(),
useDraftState: z.boolean().optional(),
input: z.any().optional(),
startBlockId: z.string().optional(),
// Optional workflow state override (for executing diff workflows)
workflowStateOverride: z
.object({
@@ -45,30 +43,21 @@ const ExecuteWorkflowSchema = z.object({
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
class UsageLimitError extends Error {
statusCode: number
constructor(message: string, statusCode = 402) {
super(message)
this.statusCode = statusCode
}
}
/**
* Execute workflow with streaming support - used by chat and other streaming endpoints
*
* This function assumes preprocessing has already been completed.
* Callers must run preprocessExecution() first to validate workflow, check usage limits,
* and resolve actor before calling this function.
*
* This is a wrapper function that:
* - Checks usage limits before execution (protects chat and streaming paths)
* - Logs usage limit errors to the database for user visibility
* - Supports streaming callbacks (onStream, onBlockComplete)
* - Returns ExecutionResult instead of NextResponse
* - Handles pause/resume logic
*
* Used by:
* - Chat execution (/api/chat/[identifier]/route.ts)
* - Streaming responses (lib/workflows/streaming.ts)
*
* Note: The POST handler in this file calls executeWorkflowCore() directly and has
* its own usage check. This wrapper provides convenience and built-in protection
* for callers that need streaming support.
*/
export async function executeWorkflow(
workflow: any,
@@ -92,38 +81,6 @@ export async function executeWorkflow(
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
}
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
stackTrace: undefined,
},
traceSpans: [],
})
throw new UsageLimitError(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.'
)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
@@ -270,23 +227,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const { id: workflowId } = await params
try {
// Authenticate user (API key, session, or internal JWT)
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
// Validate workflow access (don't require deployment for manual client runs)
const workflowValidation = await validateWorkflowAccess(req, workflowId, false)
if (workflowValidation.error) {
return NextResponse.json(
{ error: workflowValidation.error.message },
{ status: workflowValidation.error.status }
)
}
const workflow = workflowValidation.workflow!
let body: any = {}
try {
const text = await req.text()
@@ -375,43 +321,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId
)
// Check usage limits for this POST handler execution path
// Architecture note: This handler calls executeWorkflowCore() directly (both SSE and non-SSE paths).
// The executeWorkflow() wrapper function (used by chat) has its own check (line 54).
const usageCheck = await checkServerSideUsageLimits(userId)
if (usageCheck.isExceeded) {
logger.warn(`[${requestId}] User ${userId} has exceeded usage limits. Blocking execution.`, {
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
})
await loggingSession.safeStart({
userId,
workspaceId: workflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
stackTrace: undefined,
},
traceSpans: [],
})
const preprocessResult = await preprocessExecution({
workflowId,
userId,
triggerType: loggingTriggerType,
executionId,
requestId,
checkRateLimit: false, // Manual executions bypass rate limits
checkDeployment: !shouldUseDraftState, // Check deployment unless using draft
loggingSession,
})
if (!preprocessResult.success) {
return NextResponse.json(
{
error:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
},
{ status: 402 }
{ error: preprocessResult.error!.message },
{ status: preprocessResult.error!.statusCode }
)
}
// Process file fields in workflow input (base64/URL to UserFile conversion)
const actorUserId = preprocessResult.actorUserId!
const workflow = preprocessResult.workflowRecord!
logger.info(`[${requestId}] Preprocessing passed`, {
workflowId,
actorUserId,
workspaceId: workflow.workspaceId,
})
let processedInput = input
try {
const workflowData = shouldUseDraftState
@@ -438,14 +374,14 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
serializedWorkflow.blocks,
executionContext,
requestId,
userId
actorUserId
)
}
} catch (fileError) {
logger.error(`[${requestId}] Failed to process input file fields:`, fileError)
await loggingSession.safeStart({
userId,
userId: actorUserId,
workspaceId: workflow.workspaceId || '',
variables: {},
})
@@ -473,8 +409,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId,
executionId,
workflowId,
workspaceId: workflow.workspaceId,
userId,
workspaceId: workflow.workspaceId ?? undefined,
userId: actorUserId,
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
@@ -516,8 +452,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
return NextResponse.json(filteredResult)
} catch (error: any) {
// Block errors are already logged with full details by BlockExecutor
// Only log the error message here to avoid duplicate logging
const errorMessage = error.message || 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
@@ -549,9 +483,15 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {})
const stream = await createStreamingResponse({
requestId,
workflow,
workflow: {
id: workflow.id,
userId: actorUserId,
workspaceId: workflow.workspaceId,
isDeployed: workflow.isDeployed,
variables: (workflow as any).variables,
},
input: processedInput,
executingUserId: userId,
executingUserId: actorUserId,
streamConfig: {
selectedOutputs: resolvedSelectedOutputs,
isSecureMode: false,
@@ -732,8 +672,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId,
executionId,
workflowId,
workspaceId: workflow.workspaceId,
userId,
workspaceId: workflow.workspaceId ?? undefined,
userId: actorUserId,
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
@@ -808,8 +748,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} catch (error: any) {
// Block errors are already logged with full details by BlockExecutor
// Only log the error message here to avoid duplicate logging
const errorMessage = error.message || 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)

View File

@@ -3,7 +3,6 @@
import { useCallback, useEffect, useMemo, useState } from 'react'
import * as DialogPrimitive from '@radix-ui/react-dialog'
import * as VisuallyHidden from '@radix-ui/react-visually-hidden'
import fuzzysort from 'fuzzysort'
import { BookOpen, Layout, RepeatIcon, ScrollText, Search, SplitIcon } from 'lucide-react'
import { useParams, useRouter } from 'next/navigation'
import { Dialog, DialogPortal, DialogTitle } from '@/components/ui/dialog'
@@ -11,6 +10,7 @@ import { useBrandConfig } from '@/lib/branding/branding'
import { cn } from '@/lib/utils'
import { getTriggersForSidebar, hasTriggerCapability } from '@/lib/workflows/trigger-utils'
import { getAllBlocks } from '@/blocks'
import { searchItems } from './search-utils'
interface SearchModalProps {
open: boolean
@@ -340,32 +340,21 @@ export function SearchModal({
})
}
const results = fuzzysort.go(searchQuery, allItems, {
keys: ['name', 'description'],
limit: 100,
threshold: -1000,
all: true,
scoreFn: (a) => {
const nameScore = a[0] ? a[0].score : Number.NEGATIVE_INFINITY
const descScore = a[1] ? a[1].score : Number.NEGATIVE_INFINITY
const searchResults = searchItems(searchQuery, allItems)
return Math.max(nameScore * 2, descScore)
},
})
return results
.map((result) => ({
item: result.obj,
score: result.score,
}))
return searchResults
.sort((a, b) => {
if (Math.abs(a.score - b.score) > 100) {
if (a.score !== b.score) {
return b.score - a.score
}
const aOrder = orderMap[a.item.type] ?? Number.MAX_SAFE_INTEGER
const bOrder = orderMap[b.item.type] ?? Number.MAX_SAFE_INTEGER
return aOrder - bOrder
if (aOrder !== bOrder) {
return aOrder - bOrder
}
return a.item.name.localeCompare(b.item.name)
})
.map((result) => result.item)
}, [allItems, searchQuery, sectionOrder])

View File

@@ -0,0 +1,137 @@
/**
* Search utility functions for tiered matching algorithm
* Provides predictable search results prioritizing exact matches over fuzzy matches
*/
export interface SearchableItem {
id: string
name: string
description?: string
type: string
[key: string]: any
}
export interface SearchResult<T extends SearchableItem> {
item: T
score: number
matchType: 'exact' | 'prefix' | 'word-boundary' | 'substring' | 'description'
}
const SCORE_EXACT_MATCH = 10000
const SCORE_PREFIX_MATCH = 5000
const SCORE_WORD_BOUNDARY = 1000
const SCORE_SUBSTRING_MATCH = 100
const DESCRIPTION_WEIGHT = 0.3
/**
* Calculate match score for a single field
* Returns 0 if no match found
*/
function calculateFieldScore(
query: string,
field: string
): {
score: number
matchType: 'exact' | 'prefix' | 'word-boundary' | 'substring' | null
} {
const normalizedQuery = query.toLowerCase().trim()
const normalizedField = field.toLowerCase().trim()
if (!normalizedQuery || !normalizedField) {
return { score: 0, matchType: null }
}
// Tier 1: Exact match
if (normalizedField === normalizedQuery) {
return { score: SCORE_EXACT_MATCH, matchType: 'exact' }
}
// Tier 2: Prefix match (starts with query)
if (normalizedField.startsWith(normalizedQuery)) {
return { score: SCORE_PREFIX_MATCH, matchType: 'prefix' }
}
// Tier 3: Word boundary match (query matches start of a word)
const words = normalizedField.split(/[\s-_/]+/)
const hasWordBoundaryMatch = words.some((word) => word.startsWith(normalizedQuery))
if (hasWordBoundaryMatch) {
return { score: SCORE_WORD_BOUNDARY, matchType: 'word-boundary' }
}
// Tier 4: Substring match (query appears anywhere)
if (normalizedField.includes(normalizedQuery)) {
return { score: SCORE_SUBSTRING_MATCH, matchType: 'substring' }
}
// No match
return { score: 0, matchType: null }
}
/**
* Search items using tiered matching algorithm
* Returns items sorted by relevance (highest score first)
*/
export function searchItems<T extends SearchableItem>(
query: string,
items: T[]
): SearchResult<T>[] {
const normalizedQuery = query.trim()
if (!normalizedQuery) {
return []
}
const results: SearchResult<T>[] = []
for (const item of items) {
const nameMatch = calculateFieldScore(normalizedQuery, item.name)
const descMatch = item.description
? calculateFieldScore(normalizedQuery, item.description)
: { score: 0, matchType: null }
const nameScore = nameMatch.score
const descScore = descMatch.score * DESCRIPTION_WEIGHT
const bestScore = Math.max(nameScore, descScore)
if (bestScore > 0) {
let matchType: SearchResult<T>['matchType'] = 'substring'
if (nameScore >= descScore) {
matchType = nameMatch.matchType || 'substring'
} else {
matchType = 'description'
}
results.push({
item,
score: bestScore,
matchType,
})
}
}
results.sort((a, b) => b.score - a.score)
return results
}
/**
* Get a human-readable match type label
*/
export function getMatchTypeLabel(matchType: SearchResult<any>['matchType']): string {
switch (matchType) {
case 'exact':
return 'Exact match'
case 'prefix':
return 'Starts with'
case 'word-boundary':
return 'Word match'
case 'substring':
return 'Contains'
case 'description':
return 'In description'
default:
return 'Match'
}
}

View File

@@ -4,9 +4,8 @@ import { Cron } from 'croner'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type { ZodRecord, ZodString } from 'zod'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import {
@@ -19,9 +18,7 @@ import { decryptSecret } from '@/lib/utils'
import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { RateLimiter } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerScheduleExecution')
@@ -74,145 +71,6 @@ async function releaseScheduleLock(
await applyScheduleUpdate(scheduleId, updates, requestId, context)
}
async function resolveActorUserId(workflowRecord: WorkflowRecord) {
if (workflowRecord.workspaceId) {
const actor = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
if (actor) {
return actor
}
}
return workflowRecord.userId ?? null
}
async function handleWorkflowNotFound(
payload: ScheduleExecutionPayload,
executionId: string,
requestId: string,
now: Date
) {
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'schedule', requestId)
await loggingSession.safeStart({
userId: 'unknown',
workspaceId: '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
'Workflow not found. The scheduled workflow may have been deleted or is no longer accessible.',
stackTrace: undefined,
},
traceSpans: [],
})
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after missing workflow`,
`Disabled schedule ${payload.scheduleId} because the workflow no longer exists`
)
}
async function handleMissingActor(
payload: ScheduleExecutionPayload,
workflowRecord: WorkflowRecord,
executionId: string,
requestId: string,
now: Date
) {
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'schedule', requestId)
await loggingSession.safeStart({
userId: workflowRecord.userId ?? 'unknown',
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
'Unable to resolve billing account. This workflow cannot execute scheduled runs without a valid billing account.',
stackTrace: undefined,
},
traceSpans: [],
})
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release schedule ${payload.scheduleId} after billing account lookup`
)
}
async function ensureRateLimit(
actorUserId: string,
userSubscription: Awaited<ReturnType<typeof getHighestPrioritySubscription>>,
rateLimiter: RateLimiter,
loggingSession: LoggingSession,
payload: ScheduleExecutionPayload,
workflowRecord: WorkflowRecord,
requestId: string,
now: Date
) {
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
'schedule',
false
)
if (rateLimitCheck.allowed) {
return true
}
logger.warn(`[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`, {
userId: workflowRecord.userId,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
})
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${
rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'
}. Schedule will retry in 5 minutes.`,
stackTrace: undefined,
},
traceSpans: [],
})
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt: nextRetryAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} for rate limit`,
`Updated next retry time for schedule ${payload.scheduleId} due to rate limit`
)
return false
}
async function calculateNextRunFromDeployment(
payload: ScheduleExecutionPayload,
requestId: string
@@ -229,61 +87,6 @@ async function calculateNextRunFromDeployment(
}
}
async function ensureUsageLimits(
actorUserId: string,
payload: ScheduleExecutionPayload,
workflowRecord: WorkflowRecord,
loggingSession: LoggingSession,
requestId: string,
now: Date
) {
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (!usageCheck.isExceeded) {
return true
}
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using scheduled workflows.',
stackTrace: undefined,
},
traceSpans: [],
})
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
if (nextRunAt) {
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} after usage limit check`,
`Scheduled next run for ${payload.scheduleId} after usage limit`
)
}
return false
}
async function determineNextRunAfterError(
payload: ScheduleExecutionPayload,
now: Date,
@@ -385,9 +188,6 @@ async function runWorkflowExecution({
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const blocks = deployedData.blocks
const edges = deployedData.edges
const loops = deployedData.loops
const parallels = deployedData.parallels
logger.info(`[${requestId}] Loaded deployed workflow ${payload.workflowId}`)
if (payload.blockId) {
@@ -560,29 +360,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const EnvVarsSchema = zod.z.record(zod.z.string())
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`)
await handleWorkflowNotFound(payload, executionId, requestId, now)
return
}
const actorUserId = await resolveActorUserId(workflowRecord)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${payload.scheduleId}: unable to resolve billed account.`
)
await handleMissingActor(payload, workflowRecord, executionId, requestId, now)
return
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
@@ -590,31 +367,144 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
requestId
)
const rateLimiter = new RateLimiter()
const withinRateLimit = await ensureRateLimit(
actorUserId,
userSubscription,
rateLimiter,
loggingSession,
payload,
workflowRecord,
const preprocessResult = await preprocessExecution({
workflowId: payload.workflowId,
userId: 'unknown', // Will be resolved from workflow record
triggerType: 'schedule',
executionId,
requestId,
now
)
checkRateLimit: true,
checkDeployment: true,
loggingSession,
})
if (!withinRateLimit) {
return
if (!preprocessResult.success) {
const statusCode = preprocessResult.error?.statusCode || 500
switch (statusCode) {
case 401: {
logger.warn(
`[${requestId}] Authentication error during preprocessing, disabling schedule`
)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
lastFailedAt: now,
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after authentication error`,
`Disabled schedule ${payload.scheduleId} due to authentication failure (401)`
)
return
}
case 403: {
logger.warn(
`[${requestId}] Authorization error during preprocessing, disabling schedule: ${preprocessResult.error?.message}`
)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
lastFailedAt: now,
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after authorization error`,
`Disabled schedule ${payload.scheduleId} due to authorization failure (403)`
)
return
}
case 404: {
logger.warn(`[${requestId}] Workflow not found, disabling schedule`)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after missing workflow`,
`Disabled schedule ${payload.scheduleId} because the workflow no longer exists`
)
return
}
case 429: {
logger.warn(`[${requestId}] Rate limit exceeded, scheduling retry`)
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt: nextRetryAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} for rate limit`,
`Updated next retry time for schedule ${payload.scheduleId} due to rate limit`
)
return
}
case 402: {
logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`)
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
if (nextRunAt) {
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} after usage limit check`,
`Scheduled next run for ${payload.scheduleId} after usage limit`
)
}
return
}
default: {
logger.error(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`)
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
},
requestId,
`Error updating schedule ${payload.scheduleId} after preprocessing failure`,
`Updated schedule ${payload.scheduleId} after preprocessing failure`
)
return
}
}
}
const withinUsageLimits = await ensureUsageLimits(
actorUserId,
payload,
workflowRecord,
loggingSession,
requestId,
now
)
if (!withinUsageLimits) {
const { actorUserId, workflowRecord } = preprocessResult
if (!actorUserId || !workflowRecord) {
logger.error(`[${requestId}] Missing required preprocessing data`)
return
}

View File

@@ -1,9 +1,6 @@
import { db } from '@sim/db'
import { workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
@@ -21,6 +18,11 @@ export type WorkflowExecutionPayload = {
metadata?: Record<string, any>
}
/**
* Background workflow execution job
* @see preprocessExecution For detailed information on preprocessing checks
* @see executeWorkflowCore For the core workflow execution logic
*/
export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
const workflowId = payload.workflowId
const executionId = uuidv4()
@@ -36,62 +38,34 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
const wfRows = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
const workspaceId = wfRows[0]?.workspaceId || undefined
const preprocessResult = await preprocessExecution({
workflowId: payload.workflowId,
userId: payload.userId,
triggerType: triggerType,
executionId: executionId,
requestId: requestId,
checkRateLimit: true,
checkDeployment: true,
loggingSession: loggingSession,
})
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
}
)
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
variables: {},
if (!preprocessResult.success) {
logger.error(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`, {
workflowId,
statusCode: preprocessResult.error?.statusCode,
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
stackTrace: undefined,
},
traceSpans: [],
})
throw new Error(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.'
)
throw new Error(preprocessResult.error?.message || 'Preprocessing failed')
}
const actorUserId = preprocessResult.actorUserId!
const workspaceId = preprocessResult.workflowRecord?.workspaceId || undefined
logger.info(`[${requestId}] Preprocessing passed. Using actor: ${actorUserId}`)
const workflow = await getWorkflowById(workflowId)
if (!workflow) {
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
'Workflow not found. The workflow may have been deleted or is no longer accessible.',
stackTrace: undefined,
},
traceSpans: [],
})
throw new Error(`Workflow ${workflowId} not found`)
throw new Error(`Workflow ${workflowId} not found after preprocessing`)
}
const metadata: ExecutionMetadata = {
@@ -99,7 +73,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executionId,
workflowId,
workspaceId,
userId: payload.userId,
userId: actorUserId,
triggerType: payload.triggerType || 'api',
useDraftState: false,
startTime: new Date().toISOString(),

View File

@@ -0,0 +1,557 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import { RateLimiter } from '@/services/queue/RateLimiter'
const logger = createLogger('ExecutionPreprocessing')
const BILLING_ERROR_MESSAGES = {
BILLING_REQUIRED:
'Unable to resolve billing account. This workflow cannot execute without a valid billing account.',
BILLING_ERROR_GENERIC: 'Error resolving billing account',
} as const
/**
* Attempts to resolve billing actor with fallback for resume contexts.
* Returns the resolved actor user ID or null if resolution fails and should block execution.
*
* For resume contexts, this function allows fallback to the workflow owner if workspace
* billing cannot be resolved, ensuring users can complete their paused workflows even
* if billing configuration changes mid-execution.
*
* @returns Object containing actorUserId (null if should block) and shouldBlock flag
*/
async function resolveBillingActorWithFallback(params: {
requestId: string
workflowId: string
workspaceId: string
executionId: string
triggerType: string
workflowRecord: WorkflowRecord
userId: string
isResumeContext: boolean
baseActorUserId: string | null
failureReason: 'null' | 'error'
error?: unknown
loggingSession?: LoggingSession
}): Promise<
{ actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true }
> {
const {
requestId,
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId,
failureReason,
error,
loggingSession,
} = params
if (baseActorUserId) {
return { actorUserId: baseActorUserId, shouldBlock: false }
}
const workflowOwner = workflowRecord.userId?.trim()
if (isResumeContext && workflowOwner) {
const logMessage =
failureReason === 'null'
? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.'
: '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.'
logger.warn(`[${requestId}] ${logMessage}`, {
workflowId,
workspaceId,
fallbackUserId: workflowOwner,
...(error ? { error } : {}),
})
return { actorUserId: workflowOwner, shouldBlock: false }
}
const fallbackUserId = workflowRecord.userId || userId || 'unknown'
const errorMessage =
failureReason === 'null'
? BILLING_ERROR_MESSAGES.BILLING_REQUIRED
: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC
logger.warn(`[${requestId}] ${errorMessage}`, {
workflowId,
workspaceId,
...(error ? { error } : {}),
})
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage,
loggingSession,
})
return { actorUserId: null, shouldBlock: true }
}
export interface PreprocessExecutionOptions {
// Required fields
workflowId: string
userId: string // The authenticated user ID
triggerType: 'manual' | 'api' | 'webhook' | 'schedule' | 'chat'
executionId: string
requestId: string
// Optional checks configuration
checkRateLimit?: boolean // Default: false for manual/chat, true for others
checkDeployment?: boolean // Default: true for non-manual triggers
skipUsageLimits?: boolean // Default: false (only use for test mode)
// Context information
workspaceId?: string // If known, used for billing resolution
loggingSession?: LoggingSession // If provided, will be used for error logging
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
}
/**
* Result of preprocessing checks
*/
export interface PreprocessExecutionResult {
success: boolean
error?: {
message: string
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500)
logCreated: boolean // Whether error was logged to execution_logs
}
actorUserId?: string // The user ID that will be billed
workflowRecord?: WorkflowRecord
userSubscription?: SubscriptionInfo | null
rateLimitInfo?: {
allowed: boolean
remaining: number
resetAt: Date
}
}
type WorkflowRecord = typeof workflow.$inferSelect
type SubscriptionInfo = Awaited<ReturnType<typeof getHighestPrioritySubscription>>
export async function preprocessExecution(
options: PreprocessExecutionOptions
): Promise<PreprocessExecutionResult> {
const {
workflowId,
userId,
triggerType,
executionId,
requestId,
checkRateLimit = triggerType !== 'manual' && triggerType !== 'chat',
checkDeployment = triggerType !== 'manual',
skipUsageLimits = false,
workspaceId: providedWorkspaceId,
loggingSession: providedLoggingSession,
isResumeContext = false,
} = options
logger.info(`[${requestId}] Starting execution preprocessing`, {
workflowId,
userId,
triggerType,
executionId,
})
// ========== STEP 1: Validate Workflow Exists ==========
let workflowRecord: WorkflowRecord | null = null
try {
const records = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (records.length === 0) {
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: 'unknown',
workspaceId: '',
errorMessage:
'Workflow not found. The workflow may have been deleted or is no longer accessible.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Workflow not found',
statusCode: 404,
logCreated: true,
},
}
}
workflowRecord = records[0]
} catch (error) {
logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId })
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: userId || 'unknown',
workspaceId: providedWorkspaceId || '',
errorMessage: 'Internal error while fetching workflow',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Internal error while fetching workflow',
statusCode: 500,
logCreated: true,
},
}
}
const workspaceId = workflowRecord.workspaceId || providedWorkspaceId || ''
// ========== STEP 2: Check Deployment Status ==========
if (checkDeployment && !workflowRecord.isDeployed) {
logger.warn(`[${requestId}] Workflow not deployed: ${workflowId}`)
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: workflowRecord.userId || userId,
workspaceId,
errorMessage: 'Workflow is not deployed. Please deploy the workflow before triggering it.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Workflow is not deployed',
statusCode: 403,
logCreated: true,
},
}
}
// ========== STEP 3: Resolve Billing Actor ==========
let actorUserId: string | null = null
try {
if (workspaceId) {
actorUserId = await getWorkspaceBilledAccountUserId(workspaceId)
if (actorUserId) {
logger.info(`[${requestId}] Using workspace billed account: ${actorUserId}`)
}
}
if (!actorUserId) {
actorUserId = workflowRecord.userId || userId
logger.info(`[${requestId}] Using workflow owner as actor: ${actorUserId}`)
}
if (!actorUserId) {
const result = await resolveBillingActorWithFallback({
requestId,
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId: actorUserId,
failureReason: 'null',
loggingSession: providedLoggingSession,
})
if (result.shouldBlock) {
return {
success: false,
error: {
message: 'Unable to resolve billing account',
statusCode: 500,
logCreated: true,
},
}
}
actorUserId = result.actorUserId
}
} catch (error) {
logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId })
const result = await resolveBillingActorWithFallback({
requestId,
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId: null,
failureReason: 'error',
error,
loggingSession: providedLoggingSession,
})
if (result.shouldBlock) {
return {
success: false,
error: {
message: 'Error resolving billing account',
statusCode: 500,
logCreated: true,
},
}
}
actorUserId = result.actorUserId
}
// ========== STEP 4: Get User Subscription ==========
let userSubscription: SubscriptionInfo = null
try {
userSubscription = await getHighestPrioritySubscription(actorUserId)
logger.debug(`[${requestId}] User subscription retrieved`, {
actorUserId,
hasSub: !!userSubscription,
plan: userSubscription?.plan,
})
} catch (error) {
logger.error(`[${requestId}] Error fetching subscription`, { error, actorUserId })
}
// ========== STEP 5: Check Rate Limits ==========
let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined
if (checkRateLimit) {
try {
const rateLimiter = new RateLimiter()
rateLimitInfo = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
triggerType,
false // not async
)
if (!rateLimitInfo.allowed) {
logger.warn(`[${requestId}] Rate limit exceeded for user ${actorUserId}`, {
triggerType,
remaining: rateLimitInfo.remaining,
resetAt: rateLimitInfo.resetAt,
})
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage: `Rate limit exceeded. ${rateLimitInfo.remaining} requests remaining. Resets at ${rateLimitInfo.resetAt.toISOString()}.`,
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: `Rate limit exceeded. Please try again later.`,
statusCode: 429,
logCreated: true,
},
}
}
logger.debug(`[${requestId}] Rate limit check passed`, {
remaining: rateLimitInfo.remaining,
})
} catch (error) {
logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId })
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage: 'Error checking rate limits. Execution blocked for safety.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Error checking rate limits',
statusCode: 500,
logCreated: true,
},
}
}
}
// ========== STEP 6: Check Usage Limits (CRITICAL) ==========
if (!skipUsageLimits) {
try {
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
}
)
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage:
usageCheck.message ||
`Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`,
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
statusCode: 402,
logCreated: true,
},
}
}
logger.debug(`[${requestId}] Usage limit check passed`, {
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
})
} catch (error) {
logger.error(`[${requestId}] Error checking usage limits`, { error, actorUserId })
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage:
'Unable to determine usage limits. Execution blocked for security. Please contact support.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Unable to determine usage limits. Execution blocked for security.',
statusCode: 500,
logCreated: true,
},
}
}
} else {
logger.debug(`[${requestId}] Skipping usage limits check (test mode)`)
}
// ========== SUCCESS: All Checks Passed ==========
logger.info(`[${requestId}] All preprocessing checks passed`, {
workflowId,
actorUserId,
triggerType,
})
return {
success: true,
actorUserId,
workflowRecord,
userSubscription,
rateLimitInfo,
}
}
/**
* Helper function to log preprocessing errors to the database
*
* This ensures users can see why their workflow execution was blocked.
*/
async function logPreprocessingError(params: {
workflowId: string
executionId: string
triggerType: string
requestId: string
userId: string
workspaceId: string
errorMessage: string
loggingSession?: LoggingSession
}): Promise<void> {
const {
workflowId,
executionId,
triggerType,
requestId,
userId,
workspaceId,
errorMessage,
loggingSession,
} = params
try {
const session =
loggingSession || new LoggingSession(workflowId, executionId, triggerType as any, requestId)
await session.safeStart({
userId,
workspaceId,
variables: {},
})
await session.safeCompleteWithError({
error: {
message: errorMessage,
stackTrace: undefined,
},
traceSpans: [],
})
logger.debug(`[${requestId}] Logged preprocessing error to database`, {
workflowId,
executionId,
})
} catch (error) {
logger.error(`[${requestId}] Failed to log preprocessing error`, {
error,
workflowId,
executionId,
})
// Don't throw - error logging should not block the error response
}
}

View File

@@ -3,11 +3,9 @@ import { tasks } from '@trigger.dev/sdk'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { env, isTruthy } from '@/lib/env'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
import {
handleSlackChallenge,
@@ -15,9 +13,7 @@ import {
validateMicrosoftTeamsSignature,
verifyProviderWebhook,
} from '@/lib/webhooks/utils.server'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import { executeWebhookJob } from '@/background/webhook-execution'
import { RateLimiter } from '@/services/queue'
const logger = createLogger('WebhookProcessor')
@@ -42,20 +38,6 @@ function getExternalUrl(request: NextRequest): string {
return request.url
}
async function resolveWorkflowActorUserId(foundWorkflow: {
workspaceId?: string | null
userId?: string | null
}): Promise<string | null> {
if (foundWorkflow?.workspaceId) {
const billedAccount = await getWorkspaceBilledAccountUserId(foundWorkflow.workspaceId)
if (billedAccount) {
return billedAccount
}
}
return foundWorkflow?.userId ?? null
}
export async function parseWebhookBody(
request: NextRequest,
requestId: string
@@ -509,157 +491,72 @@ export async function verifyProviderAuth(
return null
}
export async function checkRateLimits(
foundWorkflow: any,
foundWebhook: any,
requestId: string
): Promise<NextResponse | null> {
try {
const actorUserId = await resolveWorkflowActorUserId(foundWorkflow)
if (!actorUserId) {
logger.warn(`[${requestId}] Webhook requires a workspace billing account to attribute usage`)
return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 })
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
'webhook',
true
)
if (!rateLimitCheck.allowed) {
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${actorUserId}`, {
provider: foundWebhook.provider,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
})
const executionId = uuidv4()
const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: foundWorkflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'}. Please try again later.`,
stackTrace: undefined,
},
traceSpans: [],
})
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{
type: 'message',
text: 'Rate limit exceeded. Please try again later.',
},
{ status: 429 }
)
}
return NextResponse.json(
{ error: 'Rate limit exceeded. Please try again later.' },
{ status: 429 }
)
}
logger.debug(`[${requestId}] Rate limit check passed for webhook`, {
provider: foundWebhook.provider,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
})
} catch (rateLimitError) {
logger.error(`[${requestId}] Error checking webhook rate limits:`, rateLimitError)
}
return null
}
export async function checkUsageLimits(
/**
* Run preprocessing checks for webhook execution
* This replaces the old checkRateLimits and checkUsageLimits functions
*/
export async function checkWebhookPreprocessing(
foundWorkflow: any,
foundWebhook: any,
requestId: string,
testMode: boolean
): Promise<NextResponse | null> {
if (testMode) {
logger.debug(`[${requestId}] Skipping usage limit check for test webhook`)
return null
}
try {
const actorUserId = await resolveWorkflowActorUserId(foundWorkflow)
const executionId = uuidv4()
if (!actorUserId) {
logger.warn(`[${requestId}] Webhook requires a workspace billing account to attribute usage`)
return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 })
}
const preprocessResult = await preprocessExecution({
workflowId: foundWorkflow.id,
userId: foundWorkflow.userId,
triggerType: 'webhook',
executionId,
requestId,
checkRateLimit: true, // Webhooks need rate limiting
checkDeployment: true, // Webhooks require deployed workflows
skipUsageLimits: testMode, // Skip usage limits for test webhooks
workspaceId: foundWorkflow.workspaceId,
})
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${actorUserId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: foundWorkflow.id,
provider: foundWebhook.provider,
}
)
const executionId = uuidv4()
const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: foundWorkflow.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message:
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.',
stackTrace: undefined,
},
traceSpans: [],
if (!preprocessResult.success) {
const error = preprocessResult.error!
logger.warn(`[${requestId}] Webhook preprocessing failed`, {
provider: foundWebhook.provider,
error: error.message,
statusCode: error.statusCode,
})
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{
type: 'message',
text: 'Usage limit exceeded. Please upgrade your plan to continue.',
text: error.message,
},
{ status: 402 }
{ status: error.statusCode }
)
}
return NextResponse.json({ error: error.message }, { status: error.statusCode })
}
logger.debug(`[${requestId}] Webhook preprocessing passed`, {
provider: foundWebhook.provider,
})
return null
} catch (preprocessError) {
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{ error: usageCheck.message || 'Usage limit exceeded' },
{ status: 402 }
{
type: 'message',
text: 'Internal error during preprocessing',
},
{ status: 500 }
)
}
logger.debug(`[${requestId}] Usage limit check passed for webhook`, {
provider: foundWebhook.provider,
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
})
} catch (usageError) {
logger.error(`[${requestId}] Error checking webhook usage limits:`, usageError)
return NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 })
}
return null
}
export async function queueWebhookExecution(
@@ -670,14 +567,6 @@ export async function queueWebhookExecution(
options: WebhookProcessorOptions
): Promise<NextResponse> {
try {
const actorUserId = await resolveWorkflowActorUserId(foundWorkflow)
if (!actorUserId) {
logger.warn(
`[${options.requestId}] Webhook requires a workspace billing account to attribute usage`
)
return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 })
}
// GitHub event filtering for event-specific triggers
if (foundWebhook.provider === 'github') {
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
@@ -804,7 +693,7 @@ export async function queueWebhookExecution(
const payload = {
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
userId: actorUserId,
userId: foundWorkflow.userId,
provider: foundWebhook.provider,
body,
headers,

View File

@@ -1,8 +1,9 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { pausedExecutions, resumeQueue } from '@sim/db/schema'
import { and, asc, desc, eq, inArray, lt, sql } from 'drizzle-orm'
import type { Edge } from 'reactflow'
import { v4 as uuidv4 } from 'uuid'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
@@ -121,7 +122,7 @@ export class PauseResumeManager {
await db
.insert(pausedExecutions)
.values({
id: uuidv4(),
id: randomUUID(),
workflowId,
executionId,
executionSnapshot: snapshotSeed,
@@ -197,7 +198,6 @@ export class PauseResumeManager {
.limit(1)
.then((rows) => rows[0])
// Use the original execution ID for resume to maintain continuity in logs
const resumeExecutionId = executionId
const now = new Date()
@@ -205,7 +205,7 @@ export class PauseResumeManager {
const [entry] = await tx
.insert(resumeQueue)
.values({
id: uuidv4(),
id: randomUUID(),
pausedExecutionId: pausedExecution.id,
parentExecutionId: executionId,
newExecutionId: resumeExecutionId,
@@ -243,7 +243,7 @@ export class PauseResumeManager {
}
}
const resumeEntryId = uuidv4()
const resumeEntryId = randomUUID()
await tx.insert(resumeQueue).values({
id: resumeEntryId,
pausedExecutionId: pausedExecution.id,
@@ -315,7 +315,6 @@ export class PauseResumeManager {
pausedExecutionId: pausedExecution.id,
contextId,
pauseBlockId: pauseBlockId,
result,
})
}
@@ -671,14 +670,56 @@ export class PauseResumeManager {
metadata.requestId
)
logger.info('Running preprocessing checks for resume', {
resumeExecutionId,
workflowId: pausedExecution.workflowId,
userId,
})
const preprocessingResult = await preprocessExecution({
workflowId: pausedExecution.workflowId,
userId,
triggerType: 'manual', // Resume is manual
executionId: resumeExecutionId,
requestId: metadata.requestId,
checkRateLimit: false, // Manual actions bypass rate limits
checkDeployment: false, // Resuming existing execution
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
workspaceId: baseSnapshot.metadata.workspaceId,
loggingSession,
isResumeContext: true, // Enable billing fallback for paused workflow resumes
})
if (!preprocessingResult.success) {
const errorMessage =
preprocessingResult.error?.message || 'Preprocessing check failed for resume execution'
logger.error('Resume preprocessing failed', {
resumeExecutionId,
workflowId: pausedExecution.workflowId,
userId,
error: errorMessage,
})
throw new Error(errorMessage)
}
logger.info('Preprocessing checks passed for resume', {
resumeExecutionId,
actorUserId: preprocessingResult.actorUserId,
})
if (preprocessingResult.actorUserId) {
metadata.userId = preprocessingResult.actorUserId
}
logger.info('Invoking executeWorkflowCore for resume', {
resumeExecutionId,
triggerType,
useDraftState: metadata.useDraftState,
resumeFromSnapshot: metadata.resumeFromSnapshot,
actorUserId: metadata.userId,
})
// For resume executions, pass a flag to skip creating a new log entry
return await executeWorkflowCore({
snapshot: resumeSnapshot,
callbacks: {},
@@ -753,11 +794,9 @@ export class PauseResumeManager {
pausedExecutionId: string
contextId: string
pauseBlockId: string
result: ExecutionResult
}): Promise<void> {
const { pausedExecutionId, contextId, pauseBlockId, result } = args
const { pausedExecutionId, contextId, pauseBlockId } = args
// Load the current snapshot from the database
const pausedExecution = await db
.select()
.from(pausedExecutions)

View File

@@ -75,7 +75,6 @@
"entities": "6.0.1",
"framer-motion": "^12.5.0",
"fuse.js": "7.1.0",
"fuzzysort": "3.1.0",
"gray-matter": "^4.0.3",
"groq-sdk": "^0.15.0",
"html-to-text": "^9.0.5",

View File

@@ -118,7 +118,6 @@
"entities": "6.0.1",
"framer-motion": "^12.5.0",
"fuse.js": "7.1.0",
"fuzzysort": "3.1.0",
"gray-matter": "^4.0.3",
"groq-sdk": "^0.15.0",
"html-to-text": "^9.0.5",
@@ -1976,8 +1975,6 @@
"fuse.js": ["fuse.js@7.1.0", "", {}, "sha512-trLf4SzuuUxfusZADLINj+dE8clK1frKdmqiJNb1Es75fmI5oY6X2mxLVUciLLjxqw/xr72Dhy+lER6dGd02FQ=="],
"fuzzysort": ["fuzzysort@3.1.0", "", {}, "sha512-sR9BNCjBg6LNgwvxlBd0sBABvQitkLzoVY9MYYROQVX/FvfJ4Mai9LsGhDgd8qYdds0bY77VzYd5iuB+v5rwQQ=="],
"gaxios": ["gaxios@6.7.1", "", { "dependencies": { "extend": "^3.0.2", "https-proxy-agent": "^7.0.1", "is-stream": "^2.0.0", "node-fetch": "^2.6.9", "uuid": "^9.0.1" } }, "sha512-LDODD4TMYx7XXdpwxAVRAIAuB0bzv0s+ywFonY46k126qzQHT9ygyoa9tncmOiQmmDrik65UYsEkv3lbfqQ3yQ=="],
"gcp-metadata": ["gcp-metadata@6.1.1", "", { "dependencies": { "gaxios": "^6.1.1", "google-logging-utils": "^0.0.2", "json-bigint": "^1.0.0" } }, "sha512-a4tiq7E0/5fTjxPAaH4jpjkSv/uCaU2p5KC6HVGrvl0cDjA8iBZv4vv1gyzlmK0ZUKqwpOyQMKzZQe3lTit77A=="],