diff --git a/apps/sim/app/api/chat/[identifier]/route.test.ts b/apps/sim/app/api/chat/[identifier]/route.test.ts index c95102cf8..f872e35ee 100644 --- a/apps/sim/app/api/chat/[identifier]/route.test.ts +++ b/apps/sim/app/api/chat/[identifier]/route.test.ts @@ -6,6 +6,36 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createMockRequest } from '@/app/api/__test-utils__/utils' +vi.mock('@/lib/execution/preprocessing', () => ({ + preprocessExecution: vi.fn().mockResolvedValue({ + success: true, + actorUserId: 'test-user-id', + workflowRecord: { + id: 'test-workflow-id', + userId: 'test-user-id', + isDeployed: true, + workspaceId: 'test-workspace-id', + variables: {}, + }, + userSubscription: { + plan: 'pro', + status: 'active', + }, + rateLimitInfo: { + allowed: true, + remaining: 100, + resetAt: new Date(), + }, + }), +})) + +vi.mock('@/lib/logs/execution/logging-session', () => ({ + LoggingSession: vi.fn().mockImplementation(() => ({ + safeStart: vi.fn().mockResolvedValue(undefined), + safeCompleteWithError: vi.fn().mockResolvedValue(undefined), + })), +})) + describe('Chat Identifier API Route', () => { const createMockStream = () => { return new ReadableStream({ @@ -307,48 +337,16 @@ describe('Chat Identifier API Route', () => { }) it('should return 503 when workflow is not available', async () => { - // Override the default workflow result to return non-deployed - vi.doMock('@sim/db', () => { - // Track call count to return different results - let callCount = 0 + const { preprocessExecution } = await import('@/lib/execution/preprocessing') + const originalImplementation = vi.mocked(preprocessExecution).getMockImplementation() - const mockLimit = vi.fn().mockImplementation(() => { - callCount++ - if (callCount === 1) { - // First call - chat query - return [ - { - id: 'chat-id', - workflowId: 'unavailable-workflow', - userId: 'user-id', - isActive: true, - authType: 'public', - outputConfigs: [{ blockId: 'block-1', path: 'output' }], - }, - ] - } - if (callCount === 2) { - // Second call - workflow query - return [ - { - isDeployed: false, - }, - ] - } - return [] - }) - - const mockWhere = vi.fn().mockReturnValue({ limit: mockLimit }) - const mockFrom = vi.fn().mockReturnValue({ where: mockWhere }) - const mockSelect = vi.fn().mockReturnValue({ from: mockFrom }) - - return { - db: { - select: mockSelect, - }, - chat: {}, - workflow: {}, - } + vi.mocked(preprocessExecution).mockResolvedValueOnce({ + success: false, + error: { + message: 'Workflow is not deployed', + statusCode: 403, + logCreated: true, + }, }) const req = createMockRequest('POST', { input: 'Hello' }) @@ -358,11 +356,15 @@ describe('Chat Identifier API Route', () => { const response = await POST(req, { params }) - expect(response.status).toBe(503) + expect(response.status).toBe(403) const data = await response.json() expect(data).toHaveProperty('error') - expect(data).toHaveProperty('message', 'Chat workflow is not available') + expect(data).toHaveProperty('message', 'Workflow is not deployed') + + if (originalImplementation) { + vi.mocked(preprocessExecution).mockImplementation(originalImplementation) + } }) it('should return streaming response for valid chat messages', async () => { @@ -378,7 +380,6 @@ describe('Chat Identifier API Route', () => { expect(response.headers.get('Cache-Control')).toBe('no-cache') expect(response.headers.get('Connection')).toBe('keep-alive') - // Verify createStreamingResponse was called with correct workflow info expect(mockCreateStreamingResponse).toHaveBeenCalledWith( expect.objectContaining({ workflow: expect.objectContaining({ @@ -408,7 +409,6 @@ describe('Chat Identifier API Route', () => { expect(response.status).toBe(200) expect(response.body).toBeInstanceOf(ReadableStream) - // Test that we can read from the response stream if (response.body) { const reader = response.body.getReader() const { value, done } = await reader.read() @@ -447,7 +447,6 @@ describe('Chat Identifier API Route', () => { }) it('should handle invalid JSON in request body', async () => { - // Create a request with invalid JSON const req = { method: 'POST', headers: new Headers(), diff --git a/apps/sim/app/api/chat/[identifier]/route.ts b/apps/sim/app/api/chat/[identifier]/route.ts index e9d8cdd8b..cac8f60df 100644 --- a/apps/sim/app/api/chat/[identifier]/route.ts +++ b/apps/sim/app/api/chat/[identifier]/route.ts @@ -1,9 +1,10 @@ +import { randomUUID } from 'crypto' import { db } from '@sim/db' -import { chat, workflow, workspace } from '@sim/db/schema' +import { chat } from '@sim/db/schema' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' -import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { ChatFiles } from '@/lib/uploads' @@ -93,7 +94,7 @@ export async function POST( if (!deployment.isActive) { logger.warn(`[${requestId}] Chat is not active: ${identifier}`) - const executionId = uuidv4() + const executionId = randomUUID() const loggingSession = new LoggingSession( deployment.workflowId, executionId, @@ -140,82 +141,35 @@ export async function POST( return addCorsHeaders(createErrorResponse('No input provided', 400), request) } - const workflowResult = await db - .select({ - isDeployed: workflow.isDeployed, - workspaceId: workflow.workspaceId, - variables: workflow.variables, - }) - .from(workflow) - .where(eq(workflow.id, deployment.workflowId)) - .limit(1) + const executionId = randomUUID() - if (workflowResult.length === 0 || !workflowResult[0].isDeployed) { - logger.warn(`[${requestId}] Workflow not found or not deployed: ${deployment.workflowId}`) + const loggingSession = new LoggingSession(deployment.workflowId, executionId, 'chat', requestId) - const executionId = uuidv4() - const loggingSession = new LoggingSession( - deployment.workflowId, - executionId, - 'chat', - requestId + const preprocessResult = await preprocessExecution({ + workflowId: deployment.workflowId, + userId: deployment.userId, + triggerType: 'chat', + executionId, + requestId, + checkRateLimit: false, // Chat bypasses rate limits + checkDeployment: true, // Chat requires deployed workflows + loggingSession, + }) + + if (!preprocessResult.success) { + logger.warn(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`) + return addCorsHeaders( + createErrorResponse( + preprocessResult.error?.message || 'Failed to process request', + preprocessResult.error?.statusCode || 500 + ), + request ) - - await loggingSession.safeStart({ - userId: deployment.userId, - workspaceId: workflowResult[0]?.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: 'Chat workflow is not available. The workflow is not deployed.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - return addCorsHeaders(createErrorResponse('Chat workflow is not available', 503), request) } - let workspaceOwnerId = deployment.userId - if (workflowResult[0].workspaceId) { - const workspaceData = await db - .select({ ownerId: workspace.ownerId }) - .from(workspace) - .where(eq(workspace.id, workflowResult[0].workspaceId)) - .limit(1) - - if (workspaceData.length === 0) { - logger.error(`[${requestId}] Workspace not found for workflow ${deployment.workflowId}`) - - const executionId = uuidv4() - const loggingSession = new LoggingSession( - deployment.workflowId, - executionId, - 'chat', - requestId - ) - - await loggingSession.safeStart({ - userId: deployment.userId, - workspaceId: workflowResult[0].workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: 'Workspace not found. Critical configuration error - please contact support.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - return addCorsHeaders(createErrorResponse('Workspace not found', 500), request) - } - - workspaceOwnerId = workspaceData[0].ownerId - } + const { actorUserId, workflowRecord } = preprocessResult + const workspaceOwnerId = actorUserId! + const workspaceId = workflowRecord?.workspaceId || '' try { const selectedOutputs: string[] = [] @@ -232,12 +186,10 @@ export async function POST( const { SSE_HEADERS } = await import('@/lib/utils') const { createFilteredResult } = await import('@/app/api/workflows/[id]/execute/route') - const executionId = crypto.randomUUID() - const workflowInput: any = { input, conversationId } if (files && Array.isArray(files) && files.length > 0) { const executionContext = { - workspaceId: workflowResult[0].workspaceId || '', + workspaceId, workflowId: deployment.workflowId, executionId, } @@ -257,20 +209,13 @@ export async function POST( } catch (fileError: any) { logger.error(`[${requestId}] Failed to process chat files:`, fileError) - const fileLoggingSession = new LoggingSession( - deployment.workflowId, - executionId, - 'chat', - requestId - ) - - await fileLoggingSession.safeStart({ + await loggingSession.safeStart({ userId: workspaceOwnerId, - workspaceId: workflowResult[0].workspaceId || '', + workspaceId, variables: {}, }) - await fileLoggingSession.safeCompleteWithError({ + await loggingSession.safeCompleteWithError({ error: { message: `File upload failed: ${fileError.message || 'Unable to process uploaded files'}`, stackTrace: fileError.stack, @@ -285,9 +230,9 @@ export async function POST( const workflowForExecution = { id: deployment.workflowId, userId: deployment.userId, - workspaceId: workflowResult[0].workspaceId, - isDeployed: true, - variables: workflowResult[0].variables || {}, + workspaceId, + isDeployed: workflowRecord?.isDeployed ?? false, + variables: workflowRecord?.variables || {}, } const stream = await createStreamingResponse({ diff --git a/apps/sim/app/api/chat/utils.test.ts b/apps/sim/app/api/chat/utils.test.ts index 30ec46c62..bc7bca71a 100644 --- a/apps/sim/app/api/chat/utils.test.ts +++ b/apps/sim/app/api/chat/utils.test.ts @@ -71,13 +71,13 @@ describe('Chat API Utils', () => { }) describe('Auth token utils', () => { - it('should encrypt and validate auth tokens', async () => { - const { encryptAuthToken, validateAuthToken } = await import('@/app/api/chat/utils') + it('should validate auth tokens', async () => { + const { validateAuthToken } = await import('@/app/api/chat/utils') const chatId = 'test-chat-id' const type = 'password' - const token = encryptAuthToken(chatId, type) + const token = Buffer.from(`${chatId}:${type}:${Date.now()}`).toString('base64') expect(typeof token).toBe('string') expect(token.length).toBeGreaterThan(0) @@ -92,7 +92,6 @@ describe('Chat API Utils', () => { const { validateAuthToken } = await import('@/app/api/chat/utils') const chatId = 'test-chat-id' - // Create an expired token by directly constructing it with an old timestamp const expiredToken = Buffer.from( `${chatId}:password:${Date.now() - 25 * 60 * 60 * 1000}` ).toString('base64') @@ -166,20 +165,6 @@ describe('Chat API Utils', () => { 'Content-Type, X-Requested-With' ) }) - - it('should handle OPTIONS request', async () => { - const { OPTIONS } = await import('@/app/api/chat/utils') - - const mockRequest = { - headers: { - get: vi.fn().mockReturnValue('http://localhost:3000'), - }, - } as any - - const response = await OPTIONS(mockRequest) - - expect(response.status).toBe(204) - }) }) describe('Chat auth validation', () => { @@ -355,10 +340,8 @@ describe('Chat API Utils', () => { describe('Execution Result Processing', () => { it('should process logs regardless of overall success status', () => { - // Test that logs are processed even when overall execution fails - // This is key for partial success scenarios const executionResult = { - success: false, // Overall execution failed + success: false, output: {}, logs: [ { @@ -383,16 +366,13 @@ describe('Chat API Utils', () => { metadata: { duration: 1000 }, } - // Test the key logic: logs should be processed regardless of overall success expect(executionResult.success).toBe(false) expect(executionResult.logs).toBeDefined() expect(executionResult.logs).toHaveLength(2) - // First log should be successful expect(executionResult.logs[0].success).toBe(true) expect(executionResult.logs[0].output?.content).toBe('Agent 1 succeeded') - // Second log should be failed expect(executionResult.logs[1].success).toBe(false) expect(executionResult.logs[1].error).toBe('Agent 2 failed') }) @@ -405,18 +385,15 @@ describe('Chat API Utils', () => { metadata: { duration: 100 }, } - // Test direct ExecutionResult const directResult = executionResult const extractedDirect = directResult expect(extractedDirect).toBe(executionResult) - // Test StreamingExecution with embedded ExecutionResult const streamingResult = { stream: new ReadableStream(), execution: executionResult, } - // Test that streaming execution wraps the result correctly const extractedFromStreaming = streamingResult && typeof streamingResult === 'object' && 'execution' in streamingResult ? streamingResult.execution diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index 1b3b348e6..88c2062f3 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -1,7 +1,7 @@ import { db } from '@sim/db' import { chat, workflow } from '@sim/db/schema' import { eq } from 'drizzle-orm' -import { type NextRequest, NextResponse } from 'next/server' +import type { NextRequest, NextResponse } from 'next/server' import { isDev } from '@/lib/environment' import { createLogger } from '@/lib/logs/console/logger' import { hasAdminPermission } from '@/lib/permissions/utils' @@ -77,7 +77,7 @@ export async function checkChatAccess( return { hasAccess: false } } -export const encryptAuthToken = (chatId: string, type: string): string => { +const encryptAuthToken = (chatId: string, type: string): string => { return Buffer.from(`${chatId}:${type}:${Date.now()}`).toString('base64') } @@ -104,7 +104,6 @@ export const validateAuthToken = (token: string, chatId: string): boolean => { } } -// Set cookie helper function export const setChatAuthCookie = (response: NextResponse, chatId: string, type: string): void => { const token = encryptAuthToken(chatId, type) response.cookies.set({ @@ -118,7 +117,6 @@ export const setChatAuthCookie = (response: NextResponse, chatId: string, type: }) } -// Helper function to add CORS headers to responses export function addCorsHeaders(response: NextResponse, request: NextRequest) { const origin = request.headers.get('origin') || '' @@ -132,12 +130,6 @@ export function addCorsHeaders(response: NextResponse, request: NextRequest) { return response } -export async function OPTIONS(request: NextRequest) { - const response = new NextResponse(null, { status: 204 }) - return addCorsHeaders(response, request) -} - -// Validate authentication for chat access export async function validateChatAuth( requestId: string, deployment: any, @@ -146,12 +138,10 @@ export async function validateChatAuth( ): Promise<{ authorized: boolean; error?: string }> { const authType = deployment.authType || 'public' - // Public chats are accessible to everyone if (authType === 'public') { return { authorized: true } } - // Check for auth cookie first const cookieName = `chat_auth_${deployment.id}` const authCookie = request.cookies.get(cookieName) @@ -159,9 +149,7 @@ export async function validateChatAuth( return { authorized: true } } - // For password protection, check the password in the request body if (authType === 'password') { - // For GET requests, we just notify the client that authentication is required if (request.method === 'GET') { return { authorized: false, error: 'auth_required_password' } } @@ -198,22 +186,18 @@ export async function validateChatAuth( } } - // For email access control, check the email in the request body if (authType === 'email') { - // For GET requests, we just notify the client that authentication is required if (request.method === 'GET') { return { authorized: false, error: 'auth_required_email' } } try { - // Use the parsed body if provided, otherwise the auth check is not applicable if (!parsedBody) { return { authorized: false, error: 'Email is required' } } const { email, input } = parsedBody - // If this is a chat message, not an auth attempt if (input && !email) { return { authorized: false, error: 'auth_required_email' } } @@ -224,17 +208,12 @@ export async function validateChatAuth( const allowedEmails = deployment.allowedEmails || [] - // Check exact email matches if (allowedEmails.includes(email)) { - // Email is allowed but still needs OTP verification - // Return a special error code that the client will recognize return { authorized: false, error: 'otp_required' } } - // Check domain matches (prefixed with @) const domain = email.split('@')[1] if (domain && allowedEmails.some((allowed: string) => allowed === `@${domain}`)) { - // Domain is allowed but still needs OTP verification return { authorized: false, error: 'otp_required' } } @@ -257,6 +236,10 @@ export async function validateChatAuth( const { email, input, checkSSOAccess } = parsedBody + if (input && !checkSSOAccess) { + return { authorized: false, error: 'auth_required_sso' } + } + if (checkSSOAccess) { if (!email) { return { authorized: false, error: 'Email is required' } diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index a1b7b781b..718551975 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -1,5 +1,8 @@ +import { randomUUID } from 'crypto' import { type NextRequest, NextResponse } from 'next/server' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' +import { generateRequestId } from '@/lib/utils' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { validateWorkflowAccess } from '@/app/api/workflows/middleware' @@ -35,6 +38,54 @@ export async function POST( const resumeInput = payload?.input ?? payload ?? {} const userId = workflow.userId ?? '' + const resumeExecutionId = randomUUID() + const requestId = generateRequestId() + + logger.info(`[${requestId}] Preprocessing resume execution`, { + workflowId, + parentExecutionId: executionId, + resumeExecutionId, + userId, + }) + + const preprocessResult = await preprocessExecution({ + workflowId, + userId, + triggerType: 'manual', // Resume is a manual trigger + executionId: resumeExecutionId, + requestId, + checkRateLimit: false, // Manual triggers bypass rate limits + checkDeployment: false, // Resuming existing execution, deployment already checked + skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits + workspaceId: workflow.workspaceId || undefined, + isResumeContext: true, // Enable billing fallback for paused workflow resumes + }) + + if (!preprocessResult.success) { + logger.warn(`[${requestId}] Preprocessing failed for resume`, { + workflowId, + parentExecutionId: executionId, + error: preprocessResult.error?.message, + statusCode: preprocessResult.error?.statusCode, + }) + + return NextResponse.json( + { + error: + preprocessResult.error?.message || + 'Failed to validate resume execution. Please try again.', + }, + { status: preprocessResult.error?.statusCode || 400 } + ) + } + + logger.info(`[${requestId}] Preprocessing passed, proceeding with resume`, { + workflowId, + parentExecutionId: executionId, + resumeExecutionId, + actorUserId: preprocessResult.actorUserId, + }) + try { const enqueueResult = await PauseResumeManager.enqueueOrStartResume({ executionId, diff --git a/apps/sim/app/api/webhooks/test/[id]/route.ts b/apps/sim/app/api/webhooks/test/[id]/route.ts index 660174578..44b49fa1b 100644 --- a/apps/sim/app/api/webhooks/test/[id]/route.ts +++ b/apps/sim/app/api/webhooks/test/[id]/route.ts @@ -2,7 +2,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { createLogger } from '@/lib/logs/console/logger' import { generateRequestId } from '@/lib/utils' import { - checkRateLimits, + checkWebhookPreprocessing, findWebhookAndWorkflow, handleProviderChallenges, parseWebhookBody, @@ -67,9 +67,39 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ return authError } - const rateLimitError = await checkRateLimits(foundWorkflow, foundWebhook, requestId) - if (rateLimitError) { - return rateLimitError + let preprocessError: NextResponse | null = null + try { + preprocessError = await checkWebhookPreprocessing( + foundWorkflow, + foundWebhook, + requestId, + true // testMode - skips usage limits + ) + if (preprocessError) { + return preprocessError + } + } catch (error) { + logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + webhookId: foundWebhook.id, + workflowId: foundWorkflow.id, + }) + + if (foundWebhook.provider === 'microsoft-teams') { + return NextResponse.json( + { + type: 'message', + text: 'An unexpected error occurred during preprocessing', + }, + { status: 500 } + ) + } + + return NextResponse.json( + { error: 'An unexpected error occurred during preprocessing' }, + { status: 500 } + ) } logger.info( diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts index e0bec7e4d..672f19fd5 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts @@ -88,6 +88,35 @@ vi.mock('@/executor', () => ({ })), })) +vi.mock('@/lib/execution/preprocessing', () => ({ + preprocessExecution: vi.fn().mockResolvedValue({ + success: true, + actorUserId: 'test-user-id', + workflowRecord: { + id: 'test-workflow-id', + userId: 'test-user-id', + isDeployed: true, + workspaceId: 'test-workspace-id', + }, + userSubscription: { + plan: 'pro', + status: 'active', + }, + rateLimitInfo: { + allowed: true, + remaining: 100, + resetAt: new Date(), + }, + }), +})) + +vi.mock('@/lib/logs/execution/logging-session', () => ({ + LoggingSession: vi.fn().mockImplementation(() => ({ + safeStart: vi.fn().mockResolvedValue(undefined), + safeCompleteWithError: vi.fn().mockResolvedValue(undefined), + })), +})) + process.env.DATABASE_URL = 'postgresql://test:test@localhost:5432/test' vi.mock('drizzle-orm/postgres-js', () => ({ @@ -190,19 +219,6 @@ describe('Webhook Trigger API Route', () => { }) describe('Generic Webhook Authentication', () => { - beforeEach(() => { - vi.doMock('@/lib/billing/core/subscription', () => ({ - getHighestPrioritySubscription: vi.fn().mockResolvedValue({ - plan: 'pro', - status: 'active', - }), - })) - - vi.doMock('@/lib/billing', () => ({ - checkServerSideUsageLimits: vi.fn().mockResolvedValue(null), - })) - }) - it('should process generic webhook without authentication', async () => { globalMockData.webhooks.push({ id: 'generic-webhook-id', diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.ts index 0de3c1a63..8d63c5579 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.ts @@ -2,8 +2,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { createLogger } from '@/lib/logs/console/logger' import { generateRequestId } from '@/lib/utils' import { - checkRateLimits, - checkUsageLimits, + checkWebhookPreprocessing, findWebhookAndWorkflow, handleProviderChallenges, parseWebhookBody, @@ -124,14 +123,39 @@ export async function POST( return authError } - const rateLimitError = await checkRateLimits(foundWorkflow, foundWebhook, requestId) - if (rateLimitError) { - return rateLimitError - } + let preprocessError: NextResponse | null = null + try { + preprocessError = await checkWebhookPreprocessing( + foundWorkflow, + foundWebhook, + requestId, + false // testMode + ) + if (preprocessError) { + return preprocessError + } + } catch (error) { + logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + webhookId: foundWebhook.id, + workflowId: foundWorkflow.id, + }) - const usageLimitError = await checkUsageLimits(foundWorkflow, foundWebhook, requestId, false) - if (usageLimitError) { - return usageLimitError + if (foundWebhook.provider === 'microsoft-teams') { + return NextResponse.json( + { + type: 'message', + text: 'An unexpected error occurred during preprocessing', + }, + { status: 500 } + ) + } + + return NextResponse.json( + { error: 'An unexpected error occurred during preprocessing' }, + { status: 500 } + ) } if (foundWebhook.blockId) { diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index dabd563ca..e9a902316 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -2,8 +2,8 @@ import { type NextRequest, NextResponse } from 'next/server' import { validate as uuidValidate, v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' -import { checkServerSideUsageLimits } from '@/lib/billing' import { processInputFileFields } from '@/lib/execution/files' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { generateRequestId, SSE_HEADERS } from '@/lib/utils' @@ -16,7 +16,6 @@ import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/ex import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { createStreamingResponse } from '@/lib/workflows/streaming' import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils' -import { validateWorkflowAccess } from '@/app/api/workflows/middleware' import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' import type { StreamingExecution } from '@/executor/types' import { Serializer } from '@/serializer' @@ -30,7 +29,6 @@ const ExecuteWorkflowSchema = z.object({ stream: z.boolean().optional(), useDraftState: z.boolean().optional(), input: z.any().optional(), - startBlockId: z.string().optional(), // Optional workflow state override (for executing diff workflows) workflowStateOverride: z .object({ @@ -45,30 +43,21 @@ const ExecuteWorkflowSchema = z.object({ export const runtime = 'nodejs' export const dynamic = 'force-dynamic' -class UsageLimitError extends Error { - statusCode: number - constructor(message: string, statusCode = 402) { - super(message) - this.statusCode = statusCode - } -} - /** * Execute workflow with streaming support - used by chat and other streaming endpoints * + * This function assumes preprocessing has already been completed. + * Callers must run preprocessExecution() first to validate workflow, check usage limits, + * and resolve actor before calling this function. + * * This is a wrapper function that: - * - Checks usage limits before execution (protects chat and streaming paths) - * - Logs usage limit errors to the database for user visibility * - Supports streaming callbacks (onStream, onBlockComplete) * - Returns ExecutionResult instead of NextResponse + * - Handles pause/resume logic * * Used by: * - Chat execution (/api/chat/[identifier]/route.ts) * - Streaming responses (lib/workflows/streaming.ts) - * - * Note: The POST handler in this file calls executeWorkflowCore() directly and has - * its own usage check. This wrapper provides convenience and built-in protection - * for callers that need streaming support. */ export async function executeWorkflow( workflow: any, @@ -92,38 +81,6 @@ export async function executeWorkflow( const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId) try { - const usageCheck = await checkServerSideUsageLimits(actorUserId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking workflow execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId, - triggerType, - } - ) - - await loggingSession.safeStart({ - userId: actorUserId, - workspaceId: workflow.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - throw new UsageLimitError( - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.' - ) - } - const metadata: ExecutionMetadata = { requestId, executionId, @@ -270,23 +227,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const { id: workflowId } = await params try { - // Authenticate user (API key, session, or internal JWT) const auth = await checkHybridAuth(req, { requireWorkflowId: false }) if (!auth.success || !auth.userId) { return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) } const userId = auth.userId - // Validate workflow access (don't require deployment for manual client runs) - const workflowValidation = await validateWorkflowAccess(req, workflowId, false) - if (workflowValidation.error) { - return NextResponse.json( - { error: workflowValidation.error.message }, - { status: workflowValidation.error.status } - ) - } - const workflow = workflowValidation.workflow! - let body: any = {} try { const text = await req.text() @@ -375,43 +321,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: requestId ) - // Check usage limits for this POST handler execution path - // Architecture note: This handler calls executeWorkflowCore() directly (both SSE and non-SSE paths). - // The executeWorkflow() wrapper function (used by chat) has its own check (line 54). - const usageCheck = await checkServerSideUsageLimits(userId) - if (usageCheck.isExceeded) { - logger.warn(`[${requestId}] User ${userId} has exceeded usage limits. Blocking execution.`, { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId, - triggerType, - }) - - await loggingSession.safeStart({ - userId, - workspaceId: workflow.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', - stackTrace: undefined, - }, - traceSpans: [], - }) + const preprocessResult = await preprocessExecution({ + workflowId, + userId, + triggerType: loggingTriggerType, + executionId, + requestId, + checkRateLimit: false, // Manual executions bypass rate limits + checkDeployment: !shouldUseDraftState, // Check deployment unless using draft + loggingSession, + }) + if (!preprocessResult.success) { return NextResponse.json( - { - error: - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', - }, - { status: 402 } + { error: preprocessResult.error!.message }, + { status: preprocessResult.error!.statusCode } ) } - // Process file fields in workflow input (base64/URL to UserFile conversion) + const actorUserId = preprocessResult.actorUserId! + const workflow = preprocessResult.workflowRecord! + + logger.info(`[${requestId}] Preprocessing passed`, { + workflowId, + actorUserId, + workspaceId: workflow.workspaceId, + }) + let processedInput = input try { const workflowData = shouldUseDraftState @@ -438,14 +374,14 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: serializedWorkflow.blocks, executionContext, requestId, - userId + actorUserId ) } } catch (fileError) { logger.error(`[${requestId}] Failed to process input file fields:`, fileError) await loggingSession.safeStart({ - userId, + userId: actorUserId, workspaceId: workflow.workspaceId || '', variables: {}, }) @@ -473,8 +409,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: requestId, executionId, workflowId, - workspaceId: workflow.workspaceId, - userId, + workspaceId: workflow.workspaceId ?? undefined, + userId: actorUserId, triggerType, useDraftState: shouldUseDraftState, startTime: new Date().toISOString(), @@ -516,8 +452,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: return NextResponse.json(filteredResult) } catch (error: any) { - // Block errors are already logged with full details by BlockExecutor - // Only log the error message here to avoid duplicate logging const errorMessage = error.message || 'Unknown error' logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`) @@ -549,9 +483,15 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {}) const stream = await createStreamingResponse({ requestId, - workflow, + workflow: { + id: workflow.id, + userId: actorUserId, + workspaceId: workflow.workspaceId, + isDeployed: workflow.isDeployed, + variables: (workflow as any).variables, + }, input: processedInput, - executingUserId: userId, + executingUserId: actorUserId, streamConfig: { selectedOutputs: resolvedSelectedOutputs, isSecureMode: false, @@ -732,8 +672,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: requestId, executionId, workflowId, - workspaceId: workflow.workspaceId, - userId, + workspaceId: workflow.workspaceId ?? undefined, + userId: actorUserId, triggerType, useDraftState: shouldUseDraftState, startTime: new Date().toISOString(), @@ -808,8 +748,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, }) } catch (error: any) { - // Block errors are already logged with full details by BlockExecutor - // Only log the error message here to avoid duplicate logging const errorMessage = error.message || 'Unknown error' logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`) diff --git a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-modal.tsx b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-modal.tsx index 47a360e8a..44da500ba 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-modal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-modal.tsx @@ -3,7 +3,6 @@ import { useCallback, useEffect, useMemo, useState } from 'react' import * as DialogPrimitive from '@radix-ui/react-dialog' import * as VisuallyHidden from '@radix-ui/react-visually-hidden' -import fuzzysort from 'fuzzysort' import { BookOpen, Layout, RepeatIcon, ScrollText, Search, SplitIcon } from 'lucide-react' import { useParams, useRouter } from 'next/navigation' import { Dialog, DialogPortal, DialogTitle } from '@/components/ui/dialog' @@ -11,6 +10,7 @@ import { useBrandConfig } from '@/lib/branding/branding' import { cn } from '@/lib/utils' import { getTriggersForSidebar, hasTriggerCapability } from '@/lib/workflows/trigger-utils' import { getAllBlocks } from '@/blocks' +import { searchItems } from './search-utils' interface SearchModalProps { open: boolean @@ -340,32 +340,21 @@ export function SearchModal({ }) } - const results = fuzzysort.go(searchQuery, allItems, { - keys: ['name', 'description'], - limit: 100, - threshold: -1000, - all: true, - scoreFn: (a) => { - const nameScore = a[0] ? a[0].score : Number.NEGATIVE_INFINITY - const descScore = a[1] ? a[1].score : Number.NEGATIVE_INFINITY + const searchResults = searchItems(searchQuery, allItems) - return Math.max(nameScore * 2, descScore) - }, - }) - - return results - .map((result) => ({ - item: result.obj, - score: result.score, - })) + return searchResults .sort((a, b) => { - if (Math.abs(a.score - b.score) > 100) { + if (a.score !== b.score) { return b.score - a.score } const aOrder = orderMap[a.item.type] ?? Number.MAX_SAFE_INTEGER const bOrder = orderMap[b.item.type] ?? Number.MAX_SAFE_INTEGER - return aOrder - bOrder + if (aOrder !== bOrder) { + return aOrder - bOrder + } + + return a.item.name.localeCompare(b.item.name) }) .map((result) => result.item) }, [allItems, searchQuery, sectionOrder]) diff --git a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-utils.ts new file mode 100644 index 000000000..bbe62e754 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components-new/search-modal/search-utils.ts @@ -0,0 +1,137 @@ +/** + * Search utility functions for tiered matching algorithm + * Provides predictable search results prioritizing exact matches over fuzzy matches + */ + +export interface SearchableItem { + id: string + name: string + description?: string + type: string + [key: string]: any +} + +export interface SearchResult { + item: T + score: number + matchType: 'exact' | 'prefix' | 'word-boundary' | 'substring' | 'description' +} + +const SCORE_EXACT_MATCH = 10000 +const SCORE_PREFIX_MATCH = 5000 +const SCORE_WORD_BOUNDARY = 1000 +const SCORE_SUBSTRING_MATCH = 100 +const DESCRIPTION_WEIGHT = 0.3 + +/** + * Calculate match score for a single field + * Returns 0 if no match found + */ +function calculateFieldScore( + query: string, + field: string +): { + score: number + matchType: 'exact' | 'prefix' | 'word-boundary' | 'substring' | null +} { + const normalizedQuery = query.toLowerCase().trim() + const normalizedField = field.toLowerCase().trim() + + if (!normalizedQuery || !normalizedField) { + return { score: 0, matchType: null } + } + + // Tier 1: Exact match + if (normalizedField === normalizedQuery) { + return { score: SCORE_EXACT_MATCH, matchType: 'exact' } + } + + // Tier 2: Prefix match (starts with query) + if (normalizedField.startsWith(normalizedQuery)) { + return { score: SCORE_PREFIX_MATCH, matchType: 'prefix' } + } + + // Tier 3: Word boundary match (query matches start of a word) + const words = normalizedField.split(/[\s-_/]+/) + const hasWordBoundaryMatch = words.some((word) => word.startsWith(normalizedQuery)) + if (hasWordBoundaryMatch) { + return { score: SCORE_WORD_BOUNDARY, matchType: 'word-boundary' } + } + + // Tier 4: Substring match (query appears anywhere) + if (normalizedField.includes(normalizedQuery)) { + return { score: SCORE_SUBSTRING_MATCH, matchType: 'substring' } + } + + // No match + return { score: 0, matchType: null } +} + +/** + * Search items using tiered matching algorithm + * Returns items sorted by relevance (highest score first) + */ +export function searchItems( + query: string, + items: T[] +): SearchResult[] { + const normalizedQuery = query.trim() + + if (!normalizedQuery) { + return [] + } + + const results: SearchResult[] = [] + + for (const item of items) { + const nameMatch = calculateFieldScore(normalizedQuery, item.name) + + const descMatch = item.description + ? calculateFieldScore(normalizedQuery, item.description) + : { score: 0, matchType: null } + + const nameScore = nameMatch.score + const descScore = descMatch.score * DESCRIPTION_WEIGHT + + const bestScore = Math.max(nameScore, descScore) + + if (bestScore > 0) { + let matchType: SearchResult['matchType'] = 'substring' + if (nameScore >= descScore) { + matchType = nameMatch.matchType || 'substring' + } else { + matchType = 'description' + } + + results.push({ + item, + score: bestScore, + matchType, + }) + } + } + + results.sort((a, b) => b.score - a.score) + + return results +} + +/** + * Get a human-readable match type label + */ +export function getMatchTypeLabel(matchType: SearchResult['matchType']): string { + switch (matchType) { + case 'exact': + return 'Exact match' + case 'prefix': + return 'Starts with' + case 'word-boundary': + return 'Word match' + case 'substring': + return 'Contains' + case 'description': + return 'In description' + default: + return 'Match' + } +} diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index 01c31af68..30652e475 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -4,9 +4,8 @@ import { Cron } from 'croner' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' import type { ZodRecord, ZodString } from 'zod' -import { checkServerSideUsageLimits } from '@/lib/billing' -import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { @@ -19,9 +18,7 @@ import { decryptSecret } from '@/lib/utils' import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' -import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' -import { RateLimiter } from '@/services/queue' import { mergeSubblockState } from '@/stores/workflows/server-utils' const logger = createLogger('TriggerScheduleExecution') @@ -74,145 +71,6 @@ async function releaseScheduleLock( await applyScheduleUpdate(scheduleId, updates, requestId, context) } -async function resolveActorUserId(workflowRecord: WorkflowRecord) { - if (workflowRecord.workspaceId) { - const actor = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId) - if (actor) { - return actor - } - } - - return workflowRecord.userId ?? null -} - -async function handleWorkflowNotFound( - payload: ScheduleExecutionPayload, - executionId: string, - requestId: string, - now: Date -) { - const loggingSession = new LoggingSession(payload.workflowId, executionId, 'schedule', requestId) - - await loggingSession.safeStart({ - userId: 'unknown', - workspaceId: '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - 'Workflow not found. The scheduled workflow may have been deleted or is no longer accessible.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - await applyScheduleUpdate( - payload.scheduleId, - { - updatedAt: now, - lastQueuedAt: null, - status: 'disabled', - }, - requestId, - `Failed to disable schedule ${payload.scheduleId} after missing workflow`, - `Disabled schedule ${payload.scheduleId} because the workflow no longer exists` - ) -} - -async function handleMissingActor( - payload: ScheduleExecutionPayload, - workflowRecord: WorkflowRecord, - executionId: string, - requestId: string, - now: Date -) { - const loggingSession = new LoggingSession(payload.workflowId, executionId, 'schedule', requestId) - - await loggingSession.safeStart({ - userId: workflowRecord.userId ?? 'unknown', - workspaceId: workflowRecord.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - 'Unable to resolve billing account. This workflow cannot execute scheduled runs without a valid billing account.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - await releaseScheduleLock( - payload.scheduleId, - requestId, - now, - `Failed to release schedule ${payload.scheduleId} after billing account lookup` - ) -} - -async function ensureRateLimit( - actorUserId: string, - userSubscription: Awaited>, - rateLimiter: RateLimiter, - loggingSession: LoggingSession, - payload: ScheduleExecutionPayload, - workflowRecord: WorkflowRecord, - requestId: string, - now: Date -) { - const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( - actorUserId, - userSubscription, - 'schedule', - false - ) - - if (rateLimitCheck.allowed) { - return true - } - - logger.warn(`[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`, { - userId: workflowRecord.userId, - remaining: rateLimitCheck.remaining, - resetAt: rateLimitCheck.resetAt, - }) - - await loggingSession.safeStart({ - userId: actorUserId, - workspaceId: workflowRecord.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${ - rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown' - }. Schedule will retry in 5 minutes.`, - stackTrace: undefined, - }, - traceSpans: [], - }) - - const retryDelay = 5 * 60 * 1000 - const nextRetryAt = new Date(now.getTime() + retryDelay) - - await applyScheduleUpdate( - payload.scheduleId, - { - updatedAt: now, - nextRunAt: nextRetryAt, - }, - requestId, - `Error updating schedule ${payload.scheduleId} for rate limit`, - `Updated next retry time for schedule ${payload.scheduleId} due to rate limit` - ) - - return false -} - async function calculateNextRunFromDeployment( payload: ScheduleExecutionPayload, requestId: string @@ -229,61 +87,6 @@ async function calculateNextRunFromDeployment( } } -async function ensureUsageLimits( - actorUserId: string, - payload: ScheduleExecutionPayload, - workflowRecord: WorkflowRecord, - loggingSession: LoggingSession, - requestId: string, - now: Date -) { - const usageCheck = await checkServerSideUsageLimits(actorUserId) - if (!usageCheck.isExceeded) { - return true - } - - logger.warn( - `[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId: payload.workflowId, - } - ) - - await loggingSession.safeStart({ - userId: actorUserId, - workspaceId: workflowRecord.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - usageCheck.message || - 'Usage limit exceeded. Please upgrade your plan to continue using scheduled workflows.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - const nextRunAt = await calculateNextRunFromDeployment(payload, requestId) - if (nextRunAt) { - await applyScheduleUpdate( - payload.scheduleId, - { - updatedAt: now, - nextRunAt, - }, - requestId, - `Error updating schedule ${payload.scheduleId} after usage limit check`, - `Scheduled next run for ${payload.scheduleId} after usage limit` - ) - } - - return false -} - async function determineNextRunAfterError( payload: ScheduleExecutionPayload, now: Date, @@ -385,9 +188,6 @@ async function runWorkflowExecution({ const deployedData = await loadDeployedWorkflowState(payload.workflowId) const blocks = deployedData.blocks - const edges = deployedData.edges - const loops = deployedData.loops - const parallels = deployedData.parallels logger.info(`[${requestId}] Loaded deployed workflow ${payload.workflowId}`) if (payload.blockId) { @@ -560,29 +360,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { const EnvVarsSchema = zod.z.record(zod.z.string()) try { - const [workflowRecord] = await db - .select() - .from(workflow) - .where(eq(workflow.id, payload.workflowId)) - .limit(1) - - if (!workflowRecord) { - logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`) - await handleWorkflowNotFound(payload, executionId, requestId, now) - return - } - - const actorUserId = await resolveActorUserId(workflowRecord) - if (!actorUserId) { - logger.warn( - `[${requestId}] Skipping schedule ${payload.scheduleId}: unable to resolve billed account.` - ) - await handleMissingActor(payload, workflowRecord, executionId, requestId, now) - return - } - - const userSubscription = await getHighestPrioritySubscription(actorUserId) - const loggingSession = new LoggingSession( payload.workflowId, executionId, @@ -590,31 +367,144 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { requestId ) - const rateLimiter = new RateLimiter() - const withinRateLimit = await ensureRateLimit( - actorUserId, - userSubscription, - rateLimiter, - loggingSession, - payload, - workflowRecord, + const preprocessResult = await preprocessExecution({ + workflowId: payload.workflowId, + userId: 'unknown', // Will be resolved from workflow record + triggerType: 'schedule', + executionId, requestId, - now - ) + checkRateLimit: true, + checkDeployment: true, + loggingSession, + }) - if (!withinRateLimit) { - return + if (!preprocessResult.success) { + const statusCode = preprocessResult.error?.statusCode || 500 + + switch (statusCode) { + case 401: { + logger.warn( + `[${requestId}] Authentication error during preprocessing, disabling schedule` + ) + await applyScheduleUpdate( + payload.scheduleId, + { + updatedAt: now, + lastQueuedAt: null, + lastFailedAt: now, + status: 'disabled', + }, + requestId, + `Failed to disable schedule ${payload.scheduleId} after authentication error`, + `Disabled schedule ${payload.scheduleId} due to authentication failure (401)` + ) + return + } + + case 403: { + logger.warn( + `[${requestId}] Authorization error during preprocessing, disabling schedule: ${preprocessResult.error?.message}` + ) + await applyScheduleUpdate( + payload.scheduleId, + { + updatedAt: now, + lastQueuedAt: null, + lastFailedAt: now, + status: 'disabled', + }, + requestId, + `Failed to disable schedule ${payload.scheduleId} after authorization error`, + `Disabled schedule ${payload.scheduleId} due to authorization failure (403)` + ) + return + } + + case 404: { + logger.warn(`[${requestId}] Workflow not found, disabling schedule`) + await applyScheduleUpdate( + payload.scheduleId, + { + updatedAt: now, + lastQueuedAt: null, + status: 'disabled', + }, + requestId, + `Failed to disable schedule ${payload.scheduleId} after missing workflow`, + `Disabled schedule ${payload.scheduleId} because the workflow no longer exists` + ) + return + } + + case 429: { + logger.warn(`[${requestId}] Rate limit exceeded, scheduling retry`) + const retryDelay = 5 * 60 * 1000 + const nextRetryAt = new Date(now.getTime() + retryDelay) + + await applyScheduleUpdate( + payload.scheduleId, + { + updatedAt: now, + nextRunAt: nextRetryAt, + }, + requestId, + `Error updating schedule ${payload.scheduleId} for rate limit`, + `Updated next retry time for schedule ${payload.scheduleId} due to rate limit` + ) + return + } + + case 402: { + logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`) + const nextRunAt = await calculateNextRunFromDeployment(payload, requestId) + if (nextRunAt) { + await applyScheduleUpdate( + payload.scheduleId, + { + updatedAt: now, + nextRunAt, + }, + requestId, + `Error updating schedule ${payload.scheduleId} after usage limit check`, + `Scheduled next run for ${payload.scheduleId} after usage limit` + ) + } + return + } + + default: { + logger.error(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`) + const nextRunAt = await determineNextRunAfterError(payload, now, requestId) + const newFailedCount = (payload.failedCount || 0) + 1 + const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES + + if (shouldDisable) { + logger.warn( + `[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` + ) + } + + await applyScheduleUpdate( + payload.scheduleId, + { + updatedAt: now, + nextRunAt, + failedCount: newFailedCount, + lastFailedAt: now, + status: shouldDisable ? 'disabled' : 'active', + }, + requestId, + `Error updating schedule ${payload.scheduleId} after preprocessing failure`, + `Updated schedule ${payload.scheduleId} after preprocessing failure` + ) + return + } + } } - const withinUsageLimits = await ensureUsageLimits( - actorUserId, - payload, - workflowRecord, - loggingSession, - requestId, - now - ) - if (!withinUsageLimits) { + const { actorUserId, workflowRecord } = preprocessResult + if (!actorUserId || !workflowRecord) { + logger.error(`[${requestId}] Missing required preprocessing data`) return } diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 70084687b..4789ad2e7 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -1,9 +1,6 @@ -import { db } from '@sim/db' -import { workflow as workflowTable } from '@sim/db/schema' import { task } from '@trigger.dev/sdk' -import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' -import { checkServerSideUsageLimits } from '@/lib/billing' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' @@ -21,6 +18,11 @@ export type WorkflowExecutionPayload = { metadata?: Record } +/** + * Background workflow execution job + * @see preprocessExecution For detailed information on preprocessing checks + * @see executeWorkflowCore For the core workflow execution logic + */ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { const workflowId = payload.workflowId const executionId = uuidv4() @@ -36,62 +38,34 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId) try { - const wfRows = await db - .select({ workspaceId: workflowTable.workspaceId }) - .from(workflowTable) - .where(eq(workflowTable.id, workflowId)) - .limit(1) - const workspaceId = wfRows[0]?.workspaceId || undefined + const preprocessResult = await preprocessExecution({ + workflowId: payload.workflowId, + userId: payload.userId, + triggerType: triggerType, + executionId: executionId, + requestId: requestId, + checkRateLimit: true, + checkDeployment: true, + loggingSession: loggingSession, + }) - const usageCheck = await checkServerSideUsageLimits(payload.userId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId, - } - ) - - await loggingSession.safeStart({ - userId: payload.userId, - workspaceId: workspaceId || '', - variables: {}, + if (!preprocessResult.success) { + logger.error(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`, { + workflowId, + statusCode: preprocessResult.error?.statusCode, }) - await loggingSession.safeCompleteWithError({ - error: { - message: - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - throw new Error( - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.' - ) + throw new Error(preprocessResult.error?.message || 'Preprocessing failed') } + const actorUserId = preprocessResult.actorUserId! + const workspaceId = preprocessResult.workflowRecord?.workspaceId || undefined + + logger.info(`[${requestId}] Preprocessing passed. Using actor: ${actorUserId}`) + const workflow = await getWorkflowById(workflowId) if (!workflow) { - await loggingSession.safeStart({ - userId: payload.userId, - workspaceId: workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - 'Workflow not found. The workflow may have been deleted or is no longer accessible.', - stackTrace: undefined, - }, - traceSpans: [], - }) - - throw new Error(`Workflow ${workflowId} not found`) + throw new Error(`Workflow ${workflowId} not found after preprocessing`) } const metadata: ExecutionMetadata = { @@ -99,7 +73,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { executionId, workflowId, workspaceId, - userId: payload.userId, + userId: actorUserId, triggerType: payload.triggerType || 'api', useDraftState: false, startTime: new Date().toISOString(), diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts new file mode 100644 index 000000000..7cdd2ae7b --- /dev/null +++ b/apps/sim/lib/execution/preprocessing.ts @@ -0,0 +1,557 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { eq } from 'drizzle-orm' +import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' +import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' +import { createLogger } from '@/lib/logs/console/logger' +import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' +import { RateLimiter } from '@/services/queue/RateLimiter' + +const logger = createLogger('ExecutionPreprocessing') + +const BILLING_ERROR_MESSAGES = { + BILLING_REQUIRED: + 'Unable to resolve billing account. This workflow cannot execute without a valid billing account.', + BILLING_ERROR_GENERIC: 'Error resolving billing account', +} as const + +/** + * Attempts to resolve billing actor with fallback for resume contexts. + * Returns the resolved actor user ID or null if resolution fails and should block execution. + * + * For resume contexts, this function allows fallback to the workflow owner if workspace + * billing cannot be resolved, ensuring users can complete their paused workflows even + * if billing configuration changes mid-execution. + * + * @returns Object containing actorUserId (null if should block) and shouldBlock flag + */ +async function resolveBillingActorWithFallback(params: { + requestId: string + workflowId: string + workspaceId: string + executionId: string + triggerType: string + workflowRecord: WorkflowRecord + userId: string + isResumeContext: boolean + baseActorUserId: string | null + failureReason: 'null' | 'error' + error?: unknown + loggingSession?: LoggingSession +}): Promise< + { actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true } +> { + const { + requestId, + workflowId, + workspaceId, + executionId, + triggerType, + workflowRecord, + userId, + isResumeContext, + baseActorUserId, + failureReason, + error, + loggingSession, + } = params + + if (baseActorUserId) { + return { actorUserId: baseActorUserId, shouldBlock: false } + } + + const workflowOwner = workflowRecord.userId?.trim() + if (isResumeContext && workflowOwner) { + const logMessage = + failureReason === 'null' + ? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.' + : '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.' + + logger.warn(`[${requestId}] ${logMessage}`, { + workflowId, + workspaceId, + fallbackUserId: workflowOwner, + ...(error ? { error } : {}), + }) + + return { actorUserId: workflowOwner, shouldBlock: false } + } + + const fallbackUserId = workflowRecord.userId || userId || 'unknown' + const errorMessage = + failureReason === 'null' + ? BILLING_ERROR_MESSAGES.BILLING_REQUIRED + : BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC + + logger.warn(`[${requestId}] ${errorMessage}`, { + workflowId, + workspaceId, + ...(error ? { error } : {}), + }) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: fallbackUserId, + workspaceId, + errorMessage, + loggingSession, + }) + + return { actorUserId: null, shouldBlock: true } +} + +export interface PreprocessExecutionOptions { + // Required fields + workflowId: string + userId: string // The authenticated user ID + triggerType: 'manual' | 'api' | 'webhook' | 'schedule' | 'chat' + executionId: string + requestId: string + + // Optional checks configuration + checkRateLimit?: boolean // Default: false for manual/chat, true for others + checkDeployment?: boolean // Default: true for non-manual triggers + skipUsageLimits?: boolean // Default: false (only use for test mode) + + // Context information + workspaceId?: string // If known, used for billing resolution + loggingSession?: LoggingSession // If provided, will be used for error logging + isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes) +} + +/** + * Result of preprocessing checks + */ +export interface PreprocessExecutionResult { + success: boolean + error?: { + message: string + statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500) + logCreated: boolean // Whether error was logged to execution_logs + } + actorUserId?: string // The user ID that will be billed + workflowRecord?: WorkflowRecord + userSubscription?: SubscriptionInfo | null + rateLimitInfo?: { + allowed: boolean + remaining: number + resetAt: Date + } +} + +type WorkflowRecord = typeof workflow.$inferSelect +type SubscriptionInfo = Awaited> + +export async function preprocessExecution( + options: PreprocessExecutionOptions +): Promise { + const { + workflowId, + userId, + triggerType, + executionId, + requestId, + checkRateLimit = triggerType !== 'manual' && triggerType !== 'chat', + checkDeployment = triggerType !== 'manual', + skipUsageLimits = false, + workspaceId: providedWorkspaceId, + loggingSession: providedLoggingSession, + isResumeContext = false, + } = options + + logger.info(`[${requestId}] Starting execution preprocessing`, { + workflowId, + userId, + triggerType, + executionId, + }) + + // ========== STEP 1: Validate Workflow Exists ========== + let workflowRecord: WorkflowRecord | null = null + try { + const records = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (records.length === 0) { + logger.warn(`[${requestId}] Workflow not found: ${workflowId}`) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: 'unknown', + workspaceId: '', + errorMessage: + 'Workflow not found. The workflow may have been deleted or is no longer accessible.', + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: 'Workflow not found', + statusCode: 404, + logCreated: true, + }, + } + } + + workflowRecord = records[0] + } catch (error) { + logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId }) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: userId || 'unknown', + workspaceId: providedWorkspaceId || '', + errorMessage: 'Internal error while fetching workflow', + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: 'Internal error while fetching workflow', + statusCode: 500, + logCreated: true, + }, + } + } + + const workspaceId = workflowRecord.workspaceId || providedWorkspaceId || '' + + // ========== STEP 2: Check Deployment Status ========== + if (checkDeployment && !workflowRecord.isDeployed) { + logger.warn(`[${requestId}] Workflow not deployed: ${workflowId}`) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: workflowRecord.userId || userId, + workspaceId, + errorMessage: 'Workflow is not deployed. Please deploy the workflow before triggering it.', + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: 'Workflow is not deployed', + statusCode: 403, + logCreated: true, + }, + } + } + + // ========== STEP 3: Resolve Billing Actor ========== + let actorUserId: string | null = null + + try { + if (workspaceId) { + actorUserId = await getWorkspaceBilledAccountUserId(workspaceId) + if (actorUserId) { + logger.info(`[${requestId}] Using workspace billed account: ${actorUserId}`) + } + } + + if (!actorUserId) { + actorUserId = workflowRecord.userId || userId + logger.info(`[${requestId}] Using workflow owner as actor: ${actorUserId}`) + } + + if (!actorUserId) { + const result = await resolveBillingActorWithFallback({ + requestId, + workflowId, + workspaceId, + executionId, + triggerType, + workflowRecord, + userId, + isResumeContext, + baseActorUserId: actorUserId, + failureReason: 'null', + loggingSession: providedLoggingSession, + }) + + if (result.shouldBlock) { + return { + success: false, + error: { + message: 'Unable to resolve billing account', + statusCode: 500, + logCreated: true, + }, + } + } + + actorUserId = result.actorUserId + } + } catch (error) { + logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId }) + + const result = await resolveBillingActorWithFallback({ + requestId, + workflowId, + workspaceId, + executionId, + triggerType, + workflowRecord, + userId, + isResumeContext, + baseActorUserId: null, + failureReason: 'error', + error, + loggingSession: providedLoggingSession, + }) + + if (result.shouldBlock) { + return { + success: false, + error: { + message: 'Error resolving billing account', + statusCode: 500, + logCreated: true, + }, + } + } + + actorUserId = result.actorUserId + } + + // ========== STEP 4: Get User Subscription ========== + let userSubscription: SubscriptionInfo = null + try { + userSubscription = await getHighestPrioritySubscription(actorUserId) + logger.debug(`[${requestId}] User subscription retrieved`, { + actorUserId, + hasSub: !!userSubscription, + plan: userSubscription?.plan, + }) + } catch (error) { + logger.error(`[${requestId}] Error fetching subscription`, { error, actorUserId }) + } + + // ========== STEP 5: Check Rate Limits ========== + let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined + + if (checkRateLimit) { + try { + const rateLimiter = new RateLimiter() + rateLimitInfo = await rateLimiter.checkRateLimitWithSubscription( + actorUserId, + userSubscription, + triggerType, + false // not async + ) + + if (!rateLimitInfo.allowed) { + logger.warn(`[${requestId}] Rate limit exceeded for user ${actorUserId}`, { + triggerType, + remaining: rateLimitInfo.remaining, + resetAt: rateLimitInfo.resetAt, + }) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: `Rate limit exceeded. ${rateLimitInfo.remaining} requests remaining. Resets at ${rateLimitInfo.resetAt.toISOString()}.`, + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: `Rate limit exceeded. Please try again later.`, + statusCode: 429, + logCreated: true, + }, + } + } + + logger.debug(`[${requestId}] Rate limit check passed`, { + remaining: rateLimitInfo.remaining, + }) + } catch (error) { + logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId }) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: 'Error checking rate limits. Execution blocked for safety.', + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: 'Error checking rate limits', + statusCode: 500, + logCreated: true, + }, + } + } + } + + // ========== STEP 6: Check Usage Limits (CRITICAL) ========== + if (!skipUsageLimits) { + try { + const usageCheck = await checkServerSideUsageLimits(actorUserId) + + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId, + triggerType, + } + ) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: + usageCheck.message || + `Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`, + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: + usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', + statusCode: 402, + logCreated: true, + }, + } + } + + logger.debug(`[${requestId}] Usage limit check passed`, { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + }) + } catch (error) { + logger.error(`[${requestId}] Error checking usage limits`, { error, actorUserId }) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: + 'Unable to determine usage limits. Execution blocked for security. Please contact support.', + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: 'Unable to determine usage limits. Execution blocked for security.', + statusCode: 500, + logCreated: true, + }, + } + } + } else { + logger.debug(`[${requestId}] Skipping usage limits check (test mode)`) + } + + // ========== SUCCESS: All Checks Passed ========== + logger.info(`[${requestId}] All preprocessing checks passed`, { + workflowId, + actorUserId, + triggerType, + }) + + return { + success: true, + actorUserId, + workflowRecord, + userSubscription, + rateLimitInfo, + } +} + +/** + * Helper function to log preprocessing errors to the database + * + * This ensures users can see why their workflow execution was blocked. + */ +async function logPreprocessingError(params: { + workflowId: string + executionId: string + triggerType: string + requestId: string + userId: string + workspaceId: string + errorMessage: string + loggingSession?: LoggingSession +}): Promise { + const { + workflowId, + executionId, + triggerType, + requestId, + userId, + workspaceId, + errorMessage, + loggingSession, + } = params + + try { + const session = + loggingSession || new LoggingSession(workflowId, executionId, triggerType as any, requestId) + + await session.safeStart({ + userId, + workspaceId, + variables: {}, + }) + + await session.safeCompleteWithError({ + error: { + message: errorMessage, + stackTrace: undefined, + }, + traceSpans: [], + }) + + logger.debug(`[${requestId}] Logged preprocessing error to database`, { + workflowId, + executionId, + }) + } catch (error) { + logger.error(`[${requestId}] Failed to log preprocessing error`, { + error, + workflowId, + executionId, + }) + // Don't throw - error logging should not block the error response + } +} diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 5d58b36d0..f204f867c 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -3,11 +3,9 @@ import { tasks } from '@trigger.dev/sdk' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' -import { checkServerSideUsageLimits } from '@/lib/billing' -import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { env, isTruthy } from '@/lib/env' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' -import { LoggingSession } from '@/lib/logs/execution/logging-session' import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils' import { handleSlackChallenge, @@ -15,9 +13,7 @@ import { validateMicrosoftTeamsSignature, verifyProviderWebhook, } from '@/lib/webhooks/utils.server' -import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { executeWebhookJob } from '@/background/webhook-execution' -import { RateLimiter } from '@/services/queue' const logger = createLogger('WebhookProcessor') @@ -42,20 +38,6 @@ function getExternalUrl(request: NextRequest): string { return request.url } -async function resolveWorkflowActorUserId(foundWorkflow: { - workspaceId?: string | null - userId?: string | null -}): Promise { - if (foundWorkflow?.workspaceId) { - const billedAccount = await getWorkspaceBilledAccountUserId(foundWorkflow.workspaceId) - if (billedAccount) { - return billedAccount - } - } - - return foundWorkflow?.userId ?? null -} - export async function parseWebhookBody( request: NextRequest, requestId: string @@ -509,157 +491,72 @@ export async function verifyProviderAuth( return null } -export async function checkRateLimits( - foundWorkflow: any, - foundWebhook: any, - requestId: string -): Promise { - try { - const actorUserId = await resolveWorkflowActorUserId(foundWorkflow) - - if (!actorUserId) { - logger.warn(`[${requestId}] Webhook requires a workspace billing account to attribute usage`) - return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 }) - } - - const userSubscription = await getHighestPrioritySubscription(actorUserId) - - const rateLimiter = new RateLimiter() - const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( - actorUserId, - userSubscription, - 'webhook', - true - ) - - if (!rateLimitCheck.allowed) { - logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${actorUserId}`, { - provider: foundWebhook.provider, - remaining: rateLimitCheck.remaining, - resetAt: rateLimitCheck.resetAt, - }) - - const executionId = uuidv4() - const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId) - - await loggingSession.safeStart({ - userId: actorUserId, - workspaceId: foundWorkflow.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'}. Please try again later.`, - stackTrace: undefined, - }, - traceSpans: [], - }) - - if (foundWebhook.provider === 'microsoft-teams') { - return NextResponse.json( - { - type: 'message', - text: 'Rate limit exceeded. Please try again later.', - }, - { status: 429 } - ) - } - - return NextResponse.json( - { error: 'Rate limit exceeded. Please try again later.' }, - { status: 429 } - ) - } - - logger.debug(`[${requestId}] Rate limit check passed for webhook`, { - provider: foundWebhook.provider, - remaining: rateLimitCheck.remaining, - resetAt: rateLimitCheck.resetAt, - }) - } catch (rateLimitError) { - logger.error(`[${requestId}] Error checking webhook rate limits:`, rateLimitError) - } - - return null -} - -export async function checkUsageLimits( +/** + * Run preprocessing checks for webhook execution + * This replaces the old checkRateLimits and checkUsageLimits functions + */ +export async function checkWebhookPreprocessing( foundWorkflow: any, foundWebhook: any, requestId: string, testMode: boolean ): Promise { - if (testMode) { - logger.debug(`[${requestId}] Skipping usage limit check for test webhook`) - return null - } - try { - const actorUserId = await resolveWorkflowActorUserId(foundWorkflow) + const executionId = uuidv4() - if (!actorUserId) { - logger.warn(`[${requestId}] Webhook requires a workspace billing account to attribute usage`) - return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 }) - } + const preprocessResult = await preprocessExecution({ + workflowId: foundWorkflow.id, + userId: foundWorkflow.userId, + triggerType: 'webhook', + executionId, + requestId, + checkRateLimit: true, // Webhooks need rate limiting + checkDeployment: true, // Webhooks require deployed workflows + skipUsageLimits: testMode, // Skip usage limits for test webhooks + workspaceId: foundWorkflow.workspaceId, + }) - const usageCheck = await checkServerSideUsageLimits(actorUserId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${actorUserId} has exceeded usage limits. Skipping webhook execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId: foundWorkflow.id, - provider: foundWebhook.provider, - } - ) - - const executionId = uuidv4() - const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId) - - await loggingSession.safeStart({ - userId: actorUserId, - workspaceId: foundWorkflow.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: - usageCheck.message || - 'Usage limit exceeded. Please upgrade your plan to continue using webhooks.', - stackTrace: undefined, - }, - traceSpans: [], + if (!preprocessResult.success) { + const error = preprocessResult.error! + logger.warn(`[${requestId}] Webhook preprocessing failed`, { + provider: foundWebhook.provider, + error: error.message, + statusCode: error.statusCode, }) if (foundWebhook.provider === 'microsoft-teams') { return NextResponse.json( { type: 'message', - text: 'Usage limit exceeded. Please upgrade your plan to continue.', + text: error.message, }, - { status: 402 } + { status: error.statusCode } ) } + return NextResponse.json({ error: error.message }, { status: error.statusCode }) + } + + logger.debug(`[${requestId}] Webhook preprocessing passed`, { + provider: foundWebhook.provider, + }) + + return null + } catch (preprocessError) { + logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError) + + if (foundWebhook.provider === 'microsoft-teams') { return NextResponse.json( - { error: usageCheck.message || 'Usage limit exceeded' }, - { status: 402 } + { + type: 'message', + text: 'Internal error during preprocessing', + }, + { status: 500 } ) } - logger.debug(`[${requestId}] Usage limit check passed for webhook`, { - provider: foundWebhook.provider, - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - }) - } catch (usageError) { - logger.error(`[${requestId}] Error checking webhook usage limits:`, usageError) + return NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 }) } - - return null } export async function queueWebhookExecution( @@ -670,14 +567,6 @@ export async function queueWebhookExecution( options: WebhookProcessorOptions ): Promise { try { - const actorUserId = await resolveWorkflowActorUserId(foundWorkflow) - if (!actorUserId) { - logger.warn( - `[${options.requestId}] Webhook requires a workspace billing account to attribute usage` - ) - return NextResponse.json({ error: 'Workspace billing account required' }, { status: 402 }) - } - // GitHub event filtering for event-specific triggers if (foundWebhook.provider === 'github') { const providerConfig = (foundWebhook.providerConfig as Record) || {} @@ -804,7 +693,7 @@ export async function queueWebhookExecution( const payload = { webhookId: foundWebhook.id, workflowId: foundWorkflow.id, - userId: actorUserId, + userId: foundWorkflow.userId, provider: foundWebhook.provider, body, headers, diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index d4b16ac68..08c0910de 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -1,8 +1,9 @@ +import { randomUUID } from 'crypto' import { db } from '@sim/db' import { pausedExecutions, resumeQueue } from '@sim/db/schema' import { and, asc, desc, eq, inArray, lt, sql } from 'drizzle-orm' import type { Edge } from 'reactflow' -import { v4 as uuidv4 } from 'uuid' +import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' @@ -121,7 +122,7 @@ export class PauseResumeManager { await db .insert(pausedExecutions) .values({ - id: uuidv4(), + id: randomUUID(), workflowId, executionId, executionSnapshot: snapshotSeed, @@ -197,7 +198,6 @@ export class PauseResumeManager { .limit(1) .then((rows) => rows[0]) - // Use the original execution ID for resume to maintain continuity in logs const resumeExecutionId = executionId const now = new Date() @@ -205,7 +205,7 @@ export class PauseResumeManager { const [entry] = await tx .insert(resumeQueue) .values({ - id: uuidv4(), + id: randomUUID(), pausedExecutionId: pausedExecution.id, parentExecutionId: executionId, newExecutionId: resumeExecutionId, @@ -243,7 +243,7 @@ export class PauseResumeManager { } } - const resumeEntryId = uuidv4() + const resumeEntryId = randomUUID() await tx.insert(resumeQueue).values({ id: resumeEntryId, pausedExecutionId: pausedExecution.id, @@ -315,7 +315,6 @@ export class PauseResumeManager { pausedExecutionId: pausedExecution.id, contextId, pauseBlockId: pauseBlockId, - result, }) } @@ -671,14 +670,56 @@ export class PauseResumeManager { metadata.requestId ) + logger.info('Running preprocessing checks for resume', { + resumeExecutionId, + workflowId: pausedExecution.workflowId, + userId, + }) + + const preprocessingResult = await preprocessExecution({ + workflowId: pausedExecution.workflowId, + userId, + triggerType: 'manual', // Resume is manual + executionId: resumeExecutionId, + requestId: metadata.requestId, + checkRateLimit: false, // Manual actions bypass rate limits + checkDeployment: false, // Resuming existing execution + skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits + workspaceId: baseSnapshot.metadata.workspaceId, + loggingSession, + isResumeContext: true, // Enable billing fallback for paused workflow resumes + }) + + if (!preprocessingResult.success) { + const errorMessage = + preprocessingResult.error?.message || 'Preprocessing check failed for resume execution' + logger.error('Resume preprocessing failed', { + resumeExecutionId, + workflowId: pausedExecution.workflowId, + userId, + error: errorMessage, + }) + + throw new Error(errorMessage) + } + + logger.info('Preprocessing checks passed for resume', { + resumeExecutionId, + actorUserId: preprocessingResult.actorUserId, + }) + + if (preprocessingResult.actorUserId) { + metadata.userId = preprocessingResult.actorUserId + } + logger.info('Invoking executeWorkflowCore for resume', { resumeExecutionId, triggerType, useDraftState: metadata.useDraftState, resumeFromSnapshot: metadata.resumeFromSnapshot, + actorUserId: metadata.userId, }) - // For resume executions, pass a flag to skip creating a new log entry return await executeWorkflowCore({ snapshot: resumeSnapshot, callbacks: {}, @@ -753,11 +794,9 @@ export class PauseResumeManager { pausedExecutionId: string contextId: string pauseBlockId: string - result: ExecutionResult }): Promise { - const { pausedExecutionId, contextId, pauseBlockId, result } = args + const { pausedExecutionId, contextId, pauseBlockId } = args - // Load the current snapshot from the database const pausedExecution = await db .select() .from(pausedExecutions) diff --git a/apps/sim/package.json b/apps/sim/package.json index 97d4733af..73cf01c72 100644 --- a/apps/sim/package.json +++ b/apps/sim/package.json @@ -75,7 +75,6 @@ "entities": "6.0.1", "framer-motion": "^12.5.0", "fuse.js": "7.1.0", - "fuzzysort": "3.1.0", "gray-matter": "^4.0.3", "groq-sdk": "^0.15.0", "html-to-text": "^9.0.5", diff --git a/bun.lock b/bun.lock index 6a6053836..7beee8de4 100644 --- a/bun.lock +++ b/bun.lock @@ -118,7 +118,6 @@ "entities": "6.0.1", "framer-motion": "^12.5.0", "fuse.js": "7.1.0", - "fuzzysort": "3.1.0", "gray-matter": "^4.0.3", "groq-sdk": "^0.15.0", "html-to-text": "^9.0.5", @@ -1976,8 +1975,6 @@ "fuse.js": ["fuse.js@7.1.0", "", {}, "sha512-trLf4SzuuUxfusZADLINj+dE8clK1frKdmqiJNb1Es75fmI5oY6X2mxLVUciLLjxqw/xr72Dhy+lER6dGd02FQ=="], - "fuzzysort": ["fuzzysort@3.1.0", "", {}, "sha512-sR9BNCjBg6LNgwvxlBd0sBABvQitkLzoVY9MYYROQVX/FvfJ4Mai9LsGhDgd8qYdds0bY77VzYd5iuB+v5rwQQ=="], - "gaxios": ["gaxios@6.7.1", "", { "dependencies": { "extend": "^3.0.2", "https-proxy-agent": "^7.0.1", "is-stream": "^2.0.0", "node-fetch": "^2.6.9", "uuid": "^9.0.1" } }, "sha512-LDODD4TMYx7XXdpwxAVRAIAuB0bzv0s+ywFonY46k126qzQHT9ygyoa9tncmOiQmmDrik65UYsEkv3lbfqQ3yQ=="], "gcp-metadata": ["gcp-metadata@6.1.1", "", { "dependencies": { "gaxios": "^6.1.1", "google-logging-utils": "^0.0.2", "json-bigint": "^1.0.0" } }, "sha512-a4tiq7E0/5fTjxPAaH4jpjkSv/uCaU2p5KC6HVGrvl0cDjA8iBZv4vv1gyzlmK0ZUKqwpOyQMKzZQe3lTit77A=="],