diff --git a/apps/sim/app/api/v1/admin/types.ts b/apps/sim/app/api/v1/admin/types.ts index 0f2dfd814..615e02d78 100644 --- a/apps/sim/app/api/v1/admin/types.ts +++ b/apps/sim/app/api/v1/admin/types.ts @@ -640,6 +640,7 @@ export interface AdminDeployResult { isDeployed: boolean version: number deployedAt: string + warnings?: string[] } export interface AdminUndeployResult { diff --git a/apps/sim/app/api/v1/admin/workflows/[id]/deploy/route.ts b/apps/sim/app/api/v1/admin/workflows/[id]/deploy/route.ts index 4f9f517ae..3eab0374d 100644 --- a/apps/sim/app/api/v1/admin/workflows/[id]/deploy/route.ts +++ b/apps/sim/app/api/v1/admin/workflows/[id]/deploy/route.ts @@ -1,14 +1,23 @@ -import { db, workflow } from '@sim/db' +import { db, workflow, workflowDeploymentVersion } from '@sim/db' import { createLogger } from '@sim/logger' -import { eq } from 'drizzle-orm' +import { and, eq } from 'drizzle-orm' import { generateRequestId } from '@/lib/core/utils/request' -import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy' +import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync' +import { + cleanupWebhooksForWorkflow, + restorePreviousVersionWebhooks, + saveTriggerWebhooksForDeploy, +} from '@/lib/webhooks/deploy' import { deployWorkflow, loadWorkflowFromNormalizedTables, undeployWorkflow, } from '@/lib/workflows/persistence/utils' -import { createSchedulesForDeploy, validateWorkflowSchedules } from '@/lib/workflows/schedules' +import { + cleanupDeploymentVersion, + createSchedulesForDeploy, + validateWorkflowSchedules, +} from '@/lib/workflows/schedules' import { withAdminAuthParams } from '@/app/api/v1/admin/middleware' import { badRequestResponse, @@ -28,10 +37,11 @@ interface RouteParams { export const POST = withAdminAuthParams(async (request, context) => { const { id: workflowId } = await context.params + const requestId = generateRequestId() try { const [workflowRecord] = await db - .select({ id: workflow.id, name: workflow.name }) + .select() .from(workflow) .where(eq(workflow.id, workflowId)) .limit(1) @@ -50,6 +60,18 @@ export const POST = withAdminAuthParams(async (request, context) => return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`) } + const [currentActiveVersion] = await db + .select({ id: workflowDeploymentVersion.id }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + const previousVersionId = currentActiveVersion?.id + const deployResult = await deployWorkflow({ workflowId, deployedBy: ADMIN_ACTOR_ID, @@ -65,6 +87,32 @@ export const POST = withAdminAuthParams(async (request, context) => return internalErrorResponse('Failed to resolve deployment version') } + const workflowData = workflowRecord as Record + + const triggerSaveResult = await saveTriggerWebhooksForDeploy({ + request, + workflowId, + workflow: workflowData, + userId: workflowRecord.userId, + blocks: normalizedData.blocks, + requestId, + deploymentVersionId: deployResult.deploymentVersionId, + previousVersionId, + }) + + if (!triggerSaveResult.success) { + await cleanupDeploymentVersion({ + workflowId, + workflow: workflowData, + requestId, + deploymentVersionId: deployResult.deploymentVersionId, + }) + await undeployWorkflow({ workflowId }) + return internalErrorResponse( + triggerSaveResult.error?.message || 'Failed to sync trigger configuration' + ) + } + const scheduleResult = await createSchedulesForDeploy( workflowId, normalizedData.blocks, @@ -72,15 +120,58 @@ export const POST = withAdminAuthParams(async (request, context) => deployResult.deploymentVersionId ) if (!scheduleResult.success) { - logger.warn(`Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`) + logger.error( + `[${requestId}] Admin API: Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}` + ) + await cleanupDeploymentVersion({ + workflowId, + workflow: workflowData, + requestId, + deploymentVersionId: deployResult.deploymentVersionId, + }) + if (previousVersionId) { + await restorePreviousVersionWebhooks({ + request, + workflow: workflowData, + userId: workflowRecord.userId, + previousVersionId, + requestId, + }) + } + await undeployWorkflow({ workflowId }) + return internalErrorResponse(scheduleResult.error || 'Failed to create schedule') } - logger.info(`Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`) + if (previousVersionId && previousVersionId !== deployResult.deploymentVersionId) { + try { + logger.info(`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId}`) + await cleanupDeploymentVersion({ + workflowId, + workflow: workflowData, + requestId, + deploymentVersionId: previousVersionId, + skipExternalCleanup: true, + }) + } catch (cleanupError) { + logger.error( + `[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`, + cleanupError + ) + } + } + + logger.info( + `[${requestId}] Admin API: Deployed workflow ${workflowId} as v${deployResult.version}` + ) + + // Sync MCP tools with the latest parameter schema + await syncMcpToolsForWorkflow({ workflowId, requestId, context: 'deploy' }) const response: AdminDeployResult = { isDeployed: true, version: deployResult.version!, deployedAt: deployResult.deployedAt!.toISOString(), + warnings: triggerSaveResult.warnings, } return singleResponse(response) @@ -105,7 +196,6 @@ export const DELETE = withAdminAuthParams(async (request, context) return notFoundResponse('Workflow') } - // Clean up external webhook subscriptions before undeploying await cleanupWebhooksForWorkflow( workflowId, workflowRecord as Record, @@ -117,6 +207,8 @@ export const DELETE = withAdminAuthParams(async (request, context) return internalErrorResponse(result.error || 'Failed to undeploy workflow') } + await removeMcpToolsForWorkflow(workflowId, requestId) + logger.info(`Admin API: Undeployed workflow ${workflowId}`) const response: AdminUndeployResult = { diff --git a/apps/sim/app/api/v1/admin/workflows/[id]/versions/[versionId]/activate/route.ts b/apps/sim/app/api/v1/admin/workflows/[id]/versions/[versionId]/activate/route.ts index 9b5ac90f1..a1406ca83 100644 --- a/apps/sim/app/api/v1/admin/workflows/[id]/versions/[versionId]/activate/route.ts +++ b/apps/sim/app/api/v1/admin/workflows/[id]/versions/[versionId]/activate/route.ts @@ -1,7 +1,15 @@ -import { db, workflow } from '@sim/db' +import { db, workflow, workflowDeploymentVersion } from '@sim/db' import { createLogger } from '@sim/logger' -import { eq } from 'drizzle-orm' +import { and, eq } from 'drizzle-orm' +import { generateRequestId } from '@/lib/core/utils/request' +import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync' +import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy' import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils' +import { + cleanupDeploymentVersion, + createSchedulesForDeploy, + validateWorkflowSchedules, +} from '@/lib/workflows/schedules' import { withAdminAuthParams } from '@/app/api/v1/admin/middleware' import { badRequestResponse, @@ -9,6 +17,7 @@ import { notFoundResponse, singleResponse, } from '@/app/api/v1/admin/responses' +import type { BlockState } from '@/stores/workflows/workflow/types' const logger = createLogger('AdminWorkflowActivateVersionAPI') @@ -18,11 +27,12 @@ interface RouteParams { } export const POST = withAdminAuthParams(async (request, context) => { + const requestId = generateRequestId() const { id: workflowId, versionId } = await context.params try { const [workflowRecord] = await db - .select({ id: workflow.id }) + .select() .from(workflow) .where(eq(workflow.id, workflowId)) .limit(1) @@ -36,23 +46,161 @@ export const POST = withAdminAuthParams(async (request, context) => return badRequestResponse('Invalid version number') } + const [versionRow] = await db + .select({ + id: workflowDeploymentVersion.id, + state: workflowDeploymentVersion.state, + }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.version, versionNum) + ) + ) + .limit(1) + + if (!versionRow?.state) { + return notFoundResponse('Deployment version') + } + + const [currentActiveVersion] = await db + .select({ id: workflowDeploymentVersion.id }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, workflowId), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + + const previousVersionId = currentActiveVersion?.id + + const deployedState = versionRow.state as { blocks?: Record } + const blocks = deployedState.blocks + if (!blocks || typeof blocks !== 'object') { + return internalErrorResponse('Invalid deployed state structure') + } + + const workflowData = workflowRecord as Record + + const scheduleValidation = validateWorkflowSchedules(blocks) + if (!scheduleValidation.isValid) { + return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`) + } + + const triggerSaveResult = await saveTriggerWebhooksForDeploy({ + request, + workflowId, + workflow: workflowData, + userId: workflowRecord.userId, + blocks, + requestId, + deploymentVersionId: versionRow.id, + previousVersionId, + forceRecreateSubscriptions: true, + }) + + if (!triggerSaveResult.success) { + logger.error( + `[${requestId}] Admin API: Failed to sync triggers for workflow ${workflowId}`, + triggerSaveResult.error + ) + return internalErrorResponse( + triggerSaveResult.error?.message || 'Failed to sync trigger configuration' + ) + } + + const scheduleResult = await createSchedulesForDeploy(workflowId, blocks, db, versionRow.id) + + if (!scheduleResult.success) { + await cleanupDeploymentVersion({ + workflowId, + workflow: workflowData, + requestId, + deploymentVersionId: versionRow.id, + }) + if (previousVersionId) { + await restorePreviousVersionWebhooks({ + request, + workflow: workflowData, + userId: workflowRecord.userId, + previousVersionId, + requestId, + }) + } + return internalErrorResponse(scheduleResult.error || 'Failed to sync schedules') + } + const result = await activateWorkflowVersion({ workflowId, version: versionNum }) if (!result.success) { + await cleanupDeploymentVersion({ + workflowId, + workflow: workflowData, + requestId, + deploymentVersionId: versionRow.id, + }) + if (previousVersionId) { + await restorePreviousVersionWebhooks({ + request, + workflow: workflowData, + userId: workflowRecord.userId, + previousVersionId, + requestId, + }) + } if (result.error === 'Deployment version not found') { return notFoundResponse('Deployment version') } return internalErrorResponse(result.error || 'Failed to activate version') } - logger.info(`Admin API: Activated version ${versionNum} for workflow ${workflowId}`) + if (previousVersionId && previousVersionId !== versionRow.id) { + try { + logger.info( + `[${requestId}] Admin API: Cleaning up previous version ${previousVersionId} webhooks/schedules` + ) + await cleanupDeploymentVersion({ + workflowId, + workflow: workflowData, + requestId, + deploymentVersionId: previousVersionId, + skipExternalCleanup: true, + }) + logger.info(`[${requestId}] Admin API: Previous version cleanup completed`) + } catch (cleanupError) { + logger.error( + `[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`, + cleanupError + ) + } + } + + await syncMcpToolsForWorkflow({ + workflowId, + requestId, + state: versionRow.state, + context: 'activate', + }) + + logger.info( + `[${requestId}] Admin API: Activated version ${versionNum} for workflow ${workflowId}` + ) return singleResponse({ success: true, version: versionNum, deployedAt: result.deployedAt!.toISOString(), + warnings: triggerSaveResult.warnings, }) } catch (error) { - logger.error(`Admin API: Failed to activate version for workflow ${workflowId}`, { error }) + logger.error( + `[${requestId}] Admin API: Failed to activate version for workflow ${workflowId}`, + { + error, + } + ) return internalErrorResponse('Failed to activate deployment version') } }) diff --git a/apps/sim/app/api/webhooks/[id]/route.ts b/apps/sim/app/api/webhooks/[id]/route.ts index ddc588e98..e4c381cad 100644 --- a/apps/sim/app/api/webhooks/[id]/route.ts +++ b/apps/sim/app/api/webhooks/[id]/route.ts @@ -7,12 +7,7 @@ import { getSession } from '@/lib/auth' import { validateInteger } from '@/lib/core/security/input-validation' import { PlatformEvents } from '@/lib/core/telemetry' import { generateRequestId } from '@/lib/core/utils/request' -import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver' -import { - cleanupExternalWebhook, - createExternalWebhookSubscription, - shouldRecreateExternalWebhookSubscription, -} from '@/lib/webhooks/provider-subscriptions' +import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WebhookAPI') @@ -88,7 +83,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ } } -// Update a webhook export async function PATCH(request: NextRequest, { params }: { params: Promise<{ id: string }> }) { const requestId = generateRequestId() @@ -103,7 +97,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise< } const body = await request.json() - const { path, provider, providerConfig, isActive, failedCount } = body + const { isActive, failedCount } = body if (failedCount !== undefined) { const validation = validateInteger(failedCount, 'failedCount', { min: 0 }) @@ -113,28 +107,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise< } } - const originalProviderConfig = providerConfig - let resolvedProviderConfig = providerConfig - if (providerConfig) { - const webhookDataForResolve = await db - .select({ - workspaceId: workflow.workspaceId, - }) - .from(webhook) - .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) - .where(eq(webhook.id, id)) - .limit(1) - - if (webhookDataForResolve.length > 0) { - resolvedProviderConfig = await resolveEnvVarsInObject( - providerConfig, - session.user.id, - webhookDataForResolve[0].workspaceId || undefined - ) - } - } - - // Find the webhook and check permissions const webhooks = await db .select({ webhook: webhook, @@ -155,16 +127,12 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise< } const webhookData = webhooks[0] - - // Check if user has permission to modify this webhook let canModify = false - // Case 1: User owns the workflow if (webhookData.workflow.userId === session.user.id) { canModify = true } - // Case 2: Workflow belongs to a workspace and user has write or admin permission if (!canModify && webhookData.workflow.workspaceId) { const userPermission = await getUserEntityPermissions( session.user.id, @@ -183,80 +151,14 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise< return NextResponse.json({ error: 'Access denied' }, { status: 403 }) } - const existingProviderConfig = - (webhookData.webhook.providerConfig as Record) || {} - let nextProviderConfig = - providerConfig !== undefined && - resolvedProviderConfig && - typeof resolvedProviderConfig === 'object' - ? (resolvedProviderConfig as Record) - : existingProviderConfig - const nextProvider = (provider ?? webhookData.webhook.provider) as string - - if ( - providerConfig !== undefined && - shouldRecreateExternalWebhookSubscription({ - previousProvider: webhookData.webhook.provider as string, - nextProvider, - previousConfig: existingProviderConfig, - nextConfig: nextProviderConfig, - }) - ) { - await cleanupExternalWebhook( - { ...webhookData.webhook, providerConfig: existingProviderConfig }, - webhookData.workflow, - requestId - ) - - const result = await createExternalWebhookSubscription( - request, - { - ...webhookData.webhook, - provider: nextProvider, - providerConfig: nextProviderConfig, - }, - webhookData.workflow, - session.user.id, - requestId - ) - - nextProviderConfig = result.updatedProviderConfig as Record - } - logger.debug(`[${requestId}] Updating webhook properties`, { - hasPathUpdate: path !== undefined, - hasProviderUpdate: provider !== undefined, - hasConfigUpdate: providerConfig !== undefined, hasActiveUpdate: isActive !== undefined, hasFailedCountUpdate: failedCount !== undefined, }) - let finalProviderConfig = webhooks[0].webhook.providerConfig - if (providerConfig !== undefined && originalProviderConfig) { - const existingConfig = existingProviderConfig - finalProviderConfig = { - ...originalProviderConfig, - credentialId: existingConfig.credentialId, - credentialSetId: existingConfig.credentialSetId, - userId: existingConfig.userId, - historyId: existingConfig.historyId, - lastCheckedTimestamp: existingConfig.lastCheckedTimestamp, - setupCompleted: existingConfig.setupCompleted, - externalId: existingConfig.externalId, - } - for (const [key, value] of Object.entries(nextProviderConfig)) { - if (!(key in originalProviderConfig)) { - ;(finalProviderConfig as Record)[key] = value - } - } - } - const updatedWebhook = await db .update(webhook) .set({ - path: path !== undefined ? path : webhooks[0].webhook.path, - provider: provider !== undefined ? provider : webhooks[0].webhook.provider, - providerConfig: finalProviderConfig, isActive: isActive !== undefined ? isActive : webhooks[0].webhook.isActive, failedCount: failedCount !== undefined ? failedCount : webhooks[0].webhook.failedCount, updatedAt: new Date(), @@ -339,11 +241,8 @@ export async function DELETE( } const foundWebhook = webhookData.webhook - const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions') - - const providerConfig = foundWebhook.providerConfig as Record | null - const credentialSetId = providerConfig?.credentialSetId as string | undefined - const blockId = providerConfig?.blockId as string | undefined + const credentialSetId = foundWebhook.credentialSetId as string | undefined + const blockId = foundWebhook.blockId as string | undefined if (credentialSetId && blockId) { const allCredentialSetWebhooks = await db @@ -351,10 +250,9 @@ export async function DELETE( .from(webhook) .where(and(eq(webhook.workflowId, webhookData.workflow.id), eq(webhook.blockId, blockId))) - const webhooksToDelete = allCredentialSetWebhooks.filter((w) => { - const config = w.providerConfig as Record | null - return config?.credentialSetId === credentialSetId - }) + const webhooksToDelete = allCredentialSetWebhooks.filter( + (w) => w.credentialSetId === credentialSetId + ) for (const w of webhooksToDelete) { await cleanupExternalWebhook(w, webhookData.workflow, requestId) diff --git a/apps/sim/app/api/webhooks/route.ts b/apps/sim/app/api/webhooks/route.ts index 5f5d9eace..6cc9876f7 100644 --- a/apps/sim/app/api/webhooks/route.ts +++ b/apps/sim/app/api/webhooks/route.ts @@ -7,9 +7,21 @@ import { type NextRequest, NextResponse } from 'next/server' import { getSession } from '@/lib/auth' import { PlatformEvents } from '@/lib/core/telemetry' import { generateRequestId } from '@/lib/core/utils/request' +import { getProviderIdFromServiceId } from '@/lib/oauth' import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver' -import { createExternalWebhookSubscription } from '@/lib/webhooks/provider-subscriptions' +import { + cleanupExternalWebhook, + createExternalWebhookSubscription, +} from '@/lib/webhooks/provider-subscriptions' +import { mergeNonUserFields } from '@/lib/webhooks/utils' +import { + configureGmailPolling, + configureOutlookPolling, + configureRssPolling, + syncWebhooksForCredentialSet, +} from '@/lib/webhooks/utils.server' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { extractCredentialSetId, isCredentialSetValue } from '@/executor/constants' const logger = createLogger('WebhooksAPI') @@ -316,8 +328,6 @@ export async function POST(request: NextRequest) { const directCredentialSetId = resolvedProviderConfig?.credentialSetId as string | undefined if (directCredentialSetId || rawCredentialId) { - const { isCredentialSetValue, extractCredentialSetId } = await import('@/executor/constants') - const credentialSetId = directCredentialSetId || (rawCredentialId && isCredentialSetValue(rawCredentialId) @@ -329,11 +339,6 @@ export async function POST(request: NextRequest) { `[${requestId}] Credential set detected for ${provider} trigger. Syncing webhooks for set ${credentialSetId}` ) - const { getProviderIdFromServiceId } = await import('@/lib/oauth') - const { syncWebhooksForCredentialSet, configureGmailPolling, configureOutlookPolling } = - await import('@/lib/webhooks/utils.server') - - // Map provider to OAuth provider ID const oauthProviderId = getProviderIdFromServiceId(provider) const { @@ -466,7 +471,8 @@ export async function POST(request: NextRequest) { providerConfig: providerConfigOverride, }) - const configToSave = { ...originalProviderConfig } + const userProvided = originalProviderConfig as Record + const configToSave: Record = { ...userProvided } try { const result = await createExternalWebhookSubscription( @@ -477,11 +483,7 @@ export async function POST(request: NextRequest) { requestId ) const updatedConfig = result.updatedProviderConfig as Record - for (const [key, value] of Object.entries(updatedConfig)) { - if (!(key in originalProviderConfig)) { - configToSave[key] = value - } - } + mergeNonUserFields(configToSave, updatedConfig, userProvided) resolvedProviderConfig = updatedConfig externalSubscriptionCreated = result.externalSubscriptionCreated } catch (err) { @@ -547,7 +549,6 @@ export async function POST(request: NextRequest) { if (externalSubscriptionCreated) { logger.error(`[${requestId}] DB save failed, cleaning up external subscription`, dbError) try { - const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions') await cleanupExternalWebhook( createTempWebhookData(configToSave), workflowRecord, @@ -567,7 +568,6 @@ export async function POST(request: NextRequest) { if (savedWebhook && provider === 'gmail') { logger.info(`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`) try { - const { configureGmailPolling } = await import('@/lib/webhooks/utils.server') const success = await configureGmailPolling(savedWebhook, requestId) if (!success) { @@ -606,7 +606,6 @@ export async function POST(request: NextRequest) { `[${requestId}] Outlook provider detected. Setting up Outlook webhook configuration.` ) try { - const { configureOutlookPolling } = await import('@/lib/webhooks/utils.server') const success = await configureOutlookPolling(savedWebhook, requestId) if (!success) { @@ -643,7 +642,6 @@ export async function POST(request: NextRequest) { if (savedWebhook && provider === 'rss') { logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`) try { - const { configureRssPolling } = await import('@/lib/webhooks/utils.server') const success = await configureRssPolling(savedWebhook, requestId) if (!success) { diff --git a/apps/sim/app/api/workflows/[id]/deploy/route.ts b/apps/sim/app/api/workflows/[id]/deploy/route.ts index 6e1172c04..9fd15eb60 100644 --- a/apps/sim/app/api/workflows/[id]/deploy/route.ts +++ b/apps/sim/app/api/workflows/[id]/deploy/route.ts @@ -4,7 +4,11 @@ import { and, desc, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { generateRequestId } from '@/lib/core/utils/request' import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync' -import { cleanupWebhooksForWorkflow, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy' +import { + cleanupWebhooksForWorkflow, + restorePreviousVersionWebhooks, + saveTriggerWebhooksForDeploy, +} from '@/lib/webhooks/deploy' import { deployWorkflow, loadWorkflowFromNormalizedTables, @@ -135,6 +139,18 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400) } + const [currentActiveVersion] = await db + .select({ id: workflowDeploymentVersion.id }) + .from(workflowDeploymentVersion) + .where( + and( + eq(workflowDeploymentVersion.workflowId, id), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .limit(1) + const previousVersionId = currentActiveVersion?.id + const deployResult = await deployWorkflow({ workflowId: id, deployedBy: actorUserId, @@ -161,6 +177,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ blocks: normalizedData.blocks, requestId, deploymentVersionId, + previousVersionId, }) if (!triggerSaveResult.success) { @@ -194,6 +211,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ requestId, deploymentVersionId, }) + if (previousVersionId) { + await restorePreviousVersionWebhooks({ + request, + workflow: workflowData as Record, + userId: actorUserId, + previousVersionId, + requestId, + }) + } await undeployWorkflow({ workflowId: id }) return createErrorResponse(scheduleResult.error || 'Failed to create schedule', 500) } @@ -208,6 +234,25 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ ) } + if (previousVersionId && previousVersionId !== deploymentVersionId) { + try { + logger.info(`[${requestId}] Cleaning up previous version ${previousVersionId} DB records`) + await cleanupDeploymentVersion({ + workflowId: id, + workflow: workflowData as Record, + requestId, + deploymentVersionId: previousVersionId, + skipExternalCleanup: true, + }) + } catch (cleanupError) { + logger.error( + `[${requestId}] Failed to clean up previous version ${previousVersionId}`, + cleanupError + ) + // Non-fatal - continue with success response + } + } + logger.info(`[${requestId}] Workflow deployed successfully: ${id}`) // Sync MCP tools with the latest parameter schema @@ -228,6 +273,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ nextRunAt: scheduleInfo.nextRunAt, } : undefined, + warnings: triggerSaveResult.warnings, }) } catch (error: any) { logger.error(`[${requestId}] Error deploying workflow: ${id}`, { diff --git a/apps/sim/app/api/workflows/[id]/deployments/[version]/activate/route.ts b/apps/sim/app/api/workflows/[id]/deployments/[version]/activate/route.ts index d3e5abb55..782b65e33 100644 --- a/apps/sim/app/api/workflows/[id]/deployments/[version]/activate/route.ts +++ b/apps/sim/app/api/workflows/[id]/deployments/[version]/activate/route.ts @@ -4,7 +4,7 @@ import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { generateRequestId } from '@/lib/core/utils/request' import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync' -import { saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy' +import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy' import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils' import { cleanupDeploymentVersion, @@ -85,6 +85,11 @@ export async function POST( return createErrorResponse('Invalid deployed state structure', 500) } + const scheduleValidation = validateWorkflowSchedules(blocks) + if (!scheduleValidation.isValid) { + return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400) + } + const triggerSaveResult = await saveTriggerWebhooksForDeploy({ request, workflowId: id, @@ -93,6 +98,8 @@ export async function POST( blocks, requestId, deploymentVersionId: versionRow.id, + previousVersionId, + forceRecreateSubscriptions: true, }) if (!triggerSaveResult.success) { @@ -102,11 +109,6 @@ export async function POST( ) } - const scheduleValidation = validateWorkflowSchedules(blocks) - if (!scheduleValidation.isValid) { - return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400) - } - const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id) if (!scheduleResult.success) { @@ -116,6 +118,15 @@ export async function POST( requestId, deploymentVersionId: versionRow.id, }) + if (previousVersionId) { + await restorePreviousVersionWebhooks({ + request, + workflow: workflowData as Record, + userId: actorUserId, + previousVersionId, + requestId, + }) + } return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500) } @@ -127,6 +138,15 @@ export async function POST( requestId, deploymentVersionId: versionRow.id, }) + if (previousVersionId) { + await restorePreviousVersionWebhooks({ + request, + workflow: workflowData as Record, + userId: actorUserId, + previousVersionId, + requestId, + }) + } return createErrorResponse(result.error || 'Failed to activate deployment', 400) } @@ -140,6 +160,7 @@ export async function POST( workflow: workflowData as Record, requestId, deploymentVersionId: previousVersionId, + skipExternalCleanup: true, }) logger.info(`[${requestId}] Previous version cleanup completed`) } catch (cleanupError) { @@ -157,7 +178,11 @@ export async function POST( context: 'activate', }) - return createSuccessResponse({ success: true, deployedAt: result.deployedAt }) + return createSuccessResponse({ + success: true, + deployedAt: result.deployedAt, + warnings: triggerSaveResult.warnings, + }) } catch (error: any) { logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error) return createErrorResponse(error.message || 'Failed to activate deployment', 500) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx index 09138be70..e90fbee6b 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx @@ -95,6 +95,7 @@ export function DeployModal({ const [activeTab, setActiveTab] = useState('general') const [chatSubmitting, setChatSubmitting] = useState(false) const [apiDeployError, setApiDeployError] = useState(null) + const [apiDeployWarnings, setApiDeployWarnings] = useState([]) const [isChatFormValid, setIsChatFormValid] = useState(false) const [selectedStreamingOutputs, setSelectedStreamingOutputs] = useState([]) @@ -227,6 +228,7 @@ export function DeployModal({ if (open && workflowId) { setActiveTab('general') setApiDeployError(null) + setApiDeployWarnings([]) } }, [open, workflowId]) @@ -282,9 +284,13 @@ export function DeployModal({ if (!workflowId) return setApiDeployError(null) + setApiDeployWarnings([]) try { - await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false }) + const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false }) + if (result.warnings && result.warnings.length > 0) { + setApiDeployWarnings(result.warnings) + } await refetchDeployedState() } catch (error: unknown) { logger.error('Error deploying workflow:', { error }) @@ -297,8 +303,13 @@ export function DeployModal({ async (version: number) => { if (!workflowId) return + setApiDeployWarnings([]) + try { - await activateVersionMutation.mutateAsync({ workflowId, version }) + const result = await activateVersionMutation.mutateAsync({ workflowId, version }) + if (result.warnings && result.warnings.length > 0) { + setApiDeployWarnings(result.warnings) + } await refetchDeployedState() } catch (error) { logger.error('Error promoting version:', { error }) @@ -324,9 +335,13 @@ export function DeployModal({ if (!workflowId) return setApiDeployError(null) + setApiDeployWarnings([]) try { - await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false }) + const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false }) + if (result.warnings && result.warnings.length > 0) { + setApiDeployWarnings(result.warnings) + } await refetchDeployedState() } catch (error: unknown) { logger.error('Error redeploying workflow:', { error }) @@ -338,6 +353,7 @@ export function DeployModal({ const handleCloseModal = useCallback(() => { setChatSubmitting(false) setApiDeployError(null) + setApiDeployWarnings([]) onOpenChange(false) }, [onOpenChange]) @@ -479,6 +495,14 @@ export function DeployModal({
{apiDeployError}
)} + {apiDeployWarnings.length > 0 && ( +
+
Deployment Warning
+ {apiDeployWarnings.map((warning, index) => ( +
{warning}
+ ))} +
+ )} { @@ -360,6 +362,7 @@ interface ActivateVersionVariables { interface ActivateVersionResult { deployedAt?: string apiKey?: string + warnings?: string[] } /** diff --git a/apps/sim/hooks/use-webhook-management.ts b/apps/sim/hooks/use-webhook-management.ts index 3df45eee0..3c8ef5a4d 100644 --- a/apps/sim/hooks/use-webhook-management.ts +++ b/apps/sim/hooks/use-webhook-management.ts @@ -1,4 +1,4 @@ -import { useCallback, useEffect, useMemo, useState } from 'react' +import { useCallback, useEffect, useMemo } from 'react' import { createLogger } from '@sim/logger' import { useParams } from 'next/navigation' import { getBaseUrl } from '@/lib/core/utils/urls' @@ -6,12 +6,10 @@ import { getBlock } from '@/blocks' import { populateTriggerFieldsFromConfig } from '@/hooks/use-trigger-config-aggregation' import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' -import { getTrigger, isTriggerValid } from '@/triggers' +import { isTriggerValid } from '@/triggers' const logger = createLogger('useWebhookManagement') -const CREDENTIAL_SET_PREFIX = 'credentialSet:' - interface UseWebhookManagementProps { blockId: string triggerId?: string @@ -24,9 +22,6 @@ interface WebhookManagementState { webhookPath: string webhookId: string | null isLoading: boolean - isSaving: boolean - saveConfig: () => Promise - deleteConfig: () => Promise } /** @@ -83,11 +78,9 @@ function resolveEffectiveTriggerId( } /** - * Hook to manage webhook lifecycle for trigger blocks - * Handles: - * - Pre-generating webhook URLs based on blockId (without creating webhook) - * - Loading existing webhooks from the API - * - Saving and deleting webhook configurations + * Hook to load webhook info for trigger blocks. + * Used for displaying webhook URLs in the UI. + * Webhook creation/updates are handled by the deploy flow. */ export function useWebhookManagement({ blockId, @@ -98,8 +91,6 @@ export function useWebhookManagement({ const params = useParams() const workflowId = params.workflowId as string - const triggerDef = triggerId && isTriggerValid(triggerId) ? getTrigger(triggerId) : null - const webhookId = useSubBlockStore( useCallback((state) => state.getValue(blockId, 'webhookId') as string | null, [blockId]) ) @@ -107,7 +98,6 @@ export function useWebhookManagement({ useCallback((state) => state.getValue(blockId, 'triggerPath') as string | null, [blockId]) ) const isLoading = useSubBlockStore((state) => state.loadingWebhooks.has(blockId)) - const isChecked = useSubBlockStore((state) => state.checkedWebhooks.has(blockId)) const webhookUrl = useMemo(() => { if (!webhookPath) { @@ -118,8 +108,6 @@ export function useWebhookManagement({ return `${baseUrl}/api/webhooks/trigger/${webhookPath}` }, [webhookPath, blockId]) - const [isSaving, setIsSaving] = useState(false) - useEffect(() => { if (triggerId && !isPreview) { const storedTriggerId = useSubBlockStore.getState().getValue(blockId, 'triggerId') @@ -143,7 +131,7 @@ export function useWebhookManagement({ return } - const loadWebhookOrGenerateUrl = async () => { + const loadWebhookInfo = async () => { useSubBlockStore.setState((state) => ({ loadingWebhooks: new Set([...state.loadingWebhooks, blockId]), })) @@ -171,8 +159,6 @@ export function useWebhookManagement({ if (webhook.providerConfig) { const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId, webhook) - // Filter out runtime/system fields from providerConfig before storing as triggerConfig - // These fields are managed by the system and should not be included in change detection const { credentialId: _credId, credentialSetId: _credSetId, @@ -224,202 +210,14 @@ export function useWebhookManagement({ } } if (useWebhookUrl) { - loadWebhookOrGenerateUrl() + loadWebhookInfo() } }, [isPreview, triggerId, workflowId, blockId, useWebhookUrl]) - const createWebhook = async ( - effectiveTriggerId: string | undefined, - selectedCredentialId: string | null - ): Promise => { - if (!triggerDef || !effectiveTriggerId) { - return false - } - - const triggerConfig = useSubBlockStore.getState().getValue(blockId, 'triggerConfig') - - const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX) - const credentialSetId = isCredentialSet - ? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length) - : undefined - const credentialId = isCredentialSet ? undefined : selectedCredentialId - - const webhookConfig = { - ...(triggerConfig || {}), - ...(credentialId ? { credentialId } : {}), - ...(credentialSetId ? { credentialSetId } : {}), - triggerId: effectiveTriggerId, - } - - const path = blockId - - const response = await fetch('/api/webhooks', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - workflowId, - blockId, - path, - provider: triggerDef.provider, - providerConfig: webhookConfig, - }), - }) - - if (!response.ok) { - let errorMessage = 'Failed to create webhook' - try { - const errorData = await response.json() - errorMessage = errorData.details || errorData.error || errorMessage - } catch { - // If response is not JSON, use default message - } - logger.error('Failed to create webhook', { errorMessage }) - throw new Error(errorMessage) - } - - const data = await response.json() - const savedWebhookId = data.webhook.id - - useSubBlockStore.getState().setValue(blockId, 'triggerPath', path) - useSubBlockStore.getState().setValue(blockId, 'triggerId', effectiveTriggerId) - useSubBlockStore.getState().setValue(blockId, 'webhookId', savedWebhookId) - useSubBlockStore.setState((state) => ({ - checkedWebhooks: new Set([...state.checkedWebhooks, blockId]), - })) - - logger.info('Trigger webhook created successfully', { - webhookId: savedWebhookId, - triggerId: effectiveTriggerId, - provider: triggerDef.provider, - blockId, - }) - - return true - } - - const updateWebhook = async ( - webhookIdToUpdate: string, - effectiveTriggerId: string | undefined, - selectedCredentialId: string | null - ): Promise => { - const triggerConfigRaw = useSubBlockStore.getState().getValue(blockId, 'triggerConfig') - const triggerConfig = - typeof triggerConfigRaw === 'object' && triggerConfigRaw !== null - ? (triggerConfigRaw as Record) - : {} - - const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX) - const credentialSetId = isCredentialSet - ? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length) - : undefined - const credentialId = isCredentialSet ? undefined : selectedCredentialId - - const response = await fetch(`/api/webhooks/${webhookIdToUpdate}`, { - method: 'PATCH', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - providerConfig: { - ...triggerConfig, - ...(credentialId ? { credentialId } : {}), - ...(credentialSetId ? { credentialSetId } : {}), - triggerId: effectiveTriggerId, - }, - }), - }) - - if (response.status === 404) { - logger.warn('Webhook not found while updating, recreating', { - blockId, - lostWebhookId: webhookIdToUpdate, - }) - useSubBlockStore.getState().setValue(blockId, 'webhookId', null) - return createWebhook(effectiveTriggerId, selectedCredentialId) - } - - if (!response.ok) { - let errorMessage = 'Failed to save trigger configuration' - try { - const errorData = await response.json() - errorMessage = errorData.details || errorData.error || errorMessage - } catch { - // If response is not JSON, use default message - } - logger.error('Failed to save trigger config', { errorMessage }) - throw new Error(errorMessage) - } - - logger.info('Trigger config saved successfully', { blockId, webhookId: webhookIdToUpdate }) - return true - } - - const saveConfig = async (): Promise => { - if (isPreview || !triggerDef) { - return false - } - - const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId) - - try { - setIsSaving(true) - - const triggerCredentials = useSubBlockStore.getState().getValue(blockId, 'triggerCredentials') - const selectedCredentialId = (triggerCredentials as string | null) || null - - if (!webhookId) { - return createWebhook(effectiveTriggerId, selectedCredentialId) - } - - return updateWebhook(webhookId, effectiveTriggerId, selectedCredentialId) - } catch (error) { - logger.error('Error saving trigger config:', error) - throw error - } finally { - setIsSaving(false) - } - } - - const deleteConfig = async (): Promise => { - if (isPreview || !webhookId) { - return false - } - - try { - setIsSaving(true) - - const response = await fetch(`/api/webhooks/${webhookId}`, { - method: 'DELETE', - }) - - if (!response.ok) { - logger.error('Failed to delete webhook') - return false - } - - useSubBlockStore.getState().setValue(blockId, 'triggerPath', '') - useSubBlockStore.getState().setValue(blockId, 'webhookId', null) - useSubBlockStore.setState((state) => { - const newSet = new Set(state.checkedWebhooks) - newSet.delete(blockId) - return { checkedWebhooks: newSet } - }) - - logger.info('Webhook deleted successfully') - return true - } catch (error) { - logger.error('Error deleting webhook:', error) - return false - } finally { - setIsSaving(false) - } - } - return { webhookUrl, webhookPath: webhookPath || blockId, webhookId, isLoading, - isSaving, - saveConfig, - deleteConfig, } } diff --git a/apps/sim/lib/webhooks/deploy.ts b/apps/sim/lib/webhooks/deploy.ts index 4b7208362..d991894c1 100644 --- a/apps/sim/lib/webhooks/deploy.ts +++ b/apps/sim/lib/webhooks/deploy.ts @@ -32,6 +32,12 @@ interface TriggerSaveError { interface TriggerSaveResult { success: boolean error?: TriggerSaveError + warnings?: string[] +} + +interface CredentialSetSyncResult { + error: TriggerSaveError | null + warnings: string[] } interface SaveTriggerWebhooksInput { @@ -42,6 +48,16 @@ interface SaveTriggerWebhooksInput { blocks: Record requestId: string deploymentVersionId?: string + /** + * The previous active version's ID. Only this version's external subscriptions + * will be cleaned up (along with draft webhooks). If not provided, skips cleanup. + */ + previousVersionId?: string + /** + * When true, forces recreation of external subscriptions even if webhook config is unchanged. + * Used when activating a previous deployment version whose subscriptions were cleaned up. + */ + forceRecreateSubscriptions?: boolean } function getSubBlockValue(block: BlockState, subBlockId: string): unknown { @@ -248,7 +264,7 @@ async function syncCredentialSetWebhooks(params: { providerConfig: Record requestId: string deploymentVersionId?: string -}): Promise { +}): Promise { const { workflowId, blockId, @@ -261,7 +277,7 @@ async function syncCredentialSetWebhooks(params: { const credentialSetId = providerConfig.credentialSetId as string | undefined if (!credentialSetId) { - return null + return { error: null, warnings: [] } } const oauthProviderId = getProviderIdFromServiceId(provider) @@ -280,10 +296,23 @@ async function syncCredentialSetWebhooks(params: { deploymentVersionId, }) + const warnings: string[] = [] + + if (syncResult.failed.length > 0) { + const failedCount = syncResult.failed.length + const totalCount = syncResult.webhooks.length + failedCount + warnings.push( + `${failedCount} of ${totalCount} credentials in the set failed to sync for ${provider}. Some team members may not receive triggers.` + ) + } + if (syncResult.webhooks.length === 0) { return { - message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`, - status: 400, + error: { + message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`, + status: 400, + }, + warnings, } } @@ -297,8 +326,11 @@ async function syncCredentialSetWebhooks(params: { if (!success) { await db.delete(webhook).where(eq(webhook.id, wh.id)) return { - message: `Failed to configure ${provider} polling. Please check account permissions.`, - status: 500, + error: { + message: `Failed to configure ${provider} polling. Please check account permissions.`, + status: 500, + }, + warnings, } } } @@ -306,84 +338,7 @@ async function syncCredentialSetWebhooks(params: { } } - return null -} - -async function createWebhookForBlock(params: { - request: NextRequest - workflowId: string - workflow: Record - userId: string - block: BlockState - provider: string - providerConfig: Record - triggerPath: string - requestId: string - deploymentVersionId?: string -}): Promise { - const { - request, - workflowId, - workflow, - userId, - block, - provider, - providerConfig, - triggerPath, - requestId, - deploymentVersionId, - } = params - - const webhookId = nanoid() - const createPayload = { - id: webhookId, - path: triggerPath, - provider, - providerConfig, - } - - const result = await createExternalWebhookSubscription( - request, - createPayload, - workflow, - userId, - requestId - ) - - const updatedProviderConfig = result.updatedProviderConfig as Record - let savedWebhook: any - - try { - const createdRows = await db - .insert(webhook) - .values({ - id: webhookId, - workflowId, - deploymentVersionId: deploymentVersionId || null, - blockId: block.id, - path: triggerPath, - provider, - providerConfig: updatedProviderConfig, - credentialSetId: (updatedProviderConfig.credentialSetId as string | undefined) || null, - isActive: true, - createdAt: new Date(), - updatedAt: new Date(), - }) - .returning() - savedWebhook = createdRows[0] - } catch (error) { - if (result.externalSubscriptionCreated) { - await cleanupExternalWebhook(createPayload, workflow, requestId) - } - throw error - } - - const pollingError = await configurePollingIfNeeded(provider, savedWebhook, requestId) - if (pollingError) { - return pollingError - } - - return null + return { error: null, warnings } } /** @@ -398,23 +353,57 @@ export async function saveTriggerWebhooksForDeploy({ blocks, requestId, deploymentVersionId, + previousVersionId, + forceRecreateSubscriptions = false, }: SaveTriggerWebhooksInput): Promise { - const triggerBlocks = Object.values(blocks || {}).filter(Boolean) + const triggerBlocks = Object.values(blocks || {}).filter((b) => b && b.enabled !== false) const currentBlockIds = new Set(triggerBlocks.map((b) => b.id)) - // 1. Get all existing webhooks for this workflow - const existingWebhooks = await db + // 1. Get ALL webhooks for this workflow (all versions including draft) + const allWorkflowWebhooks = await db .select() .from(webhook) - .where( - deploymentVersionId - ? and( - eq(webhook.workflowId, workflowId), - eq(webhook.deploymentVersionId, deploymentVersionId) - ) - : eq(webhook.workflowId, workflowId) + .where(eq(webhook.workflowId, workflowId)) + + // Separate webhooks by version: current deployment vs others + const existingWebhooks: typeof allWorkflowWebhooks = [] + + for (const wh of allWorkflowWebhooks) { + if (deploymentVersionId && wh.deploymentVersionId === deploymentVersionId) { + existingWebhooks.push(wh) + } + } + + if (previousVersionId) { + const webhooksToCleanup = allWorkflowWebhooks.filter( + (wh) => wh.deploymentVersionId === previousVersionId ) + if (webhooksToCleanup.length > 0) { + logger.info( + `[${requestId}] Cleaning up ${webhooksToCleanup.length} external subscription(s) from previous version` + ) + for (const wh of webhooksToCleanup) { + try { + await cleanupExternalWebhook(wh, workflow, requestId) + } catch (cleanupError) { + logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError) + } + } + } + } + + const restorePreviousSubscriptions = async () => { + if (!previousVersionId) return + await restorePreviousVersionWebhooks({ + request, + workflow, + userId, + previousVersionId, + requestId, + }) + } + const webhooksByBlockId = new Map() for (const wh of existingWebhooks) { if (!wh.blockId) continue @@ -429,7 +418,14 @@ export async function saveTriggerWebhooksForDeploy({ existingWebhookBlockIds: Array.from(webhooksByBlockId.keys()), }) - // 2. Determine which webhooks to delete (orphaned or config changed) + type WebhookConfig = { + provider: string + providerConfig: Record + triggerPath: string + triggerDef: ReturnType + } + const webhookConfigs = new Map() + const webhooksToDelete: typeof existingWebhooks = [] const blocksNeedingWebhook: BlockState[] = [] const blocksNeedingCredentialSetSync: BlockState[] = [] @@ -447,6 +443,7 @@ export async function saveTriggerWebhooksForDeploy({ ) if (missingFields.length > 0) { + await restorePreviousSubscriptions() return { success: false, error: { @@ -455,9 +452,8 @@ export async function saveTriggerWebhooksForDeploy({ }, } } - // Store config for later use - ;(block as any)._webhookConfig = { provider, providerConfig, triggerPath, triggerDef } + webhookConfigs.set(block.id, { provider, providerConfig, triggerPath, triggerDef }) if (providerConfig.credentialSetId) { blocksNeedingCredentialSetSync.push(block) @@ -477,22 +473,29 @@ export async function saveTriggerWebhooksForDeploy({ ) } - // Check if config changed + // Check if config changed or if we're forcing recreation (e.g., activating old version) const existingConfig = (existingWh.providerConfig as Record) || {} - if ( + const needsRecreation = + forceRecreateSubscriptions || shouldRecreateExternalWebhookSubscription({ previousProvider: existingWh.provider as string, nextProvider: provider, previousConfig: existingConfig, nextConfig: providerConfig, }) - ) { - // Config changed - delete and recreate + + if (needsRecreation) { webhooksToDelete.push(existingWh) blocksNeedingWebhook.push(block) - logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`) + if (forceRecreateSubscriptions) { + logger.info( + `[${requestId}] Forcing webhook recreation for block ${block.id} (reactivating version)` + ) + } else { + logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`) + } } - // else: config unchanged, keep existing webhook + // else: config unchanged and not forcing recreation, keep existing webhook } } @@ -522,15 +525,16 @@ export async function saveTriggerWebhooksForDeploy({ await db.delete(webhook).where(inArray(webhook.id, idsToDelete)) } - // 4. Sync credential set webhooks + const collectedWarnings: string[] = [] + for (const block of blocksNeedingCredentialSetSync) { - const config = (block as any)._webhookConfig + const config = webhookConfigs.get(block.id) if (!config) continue const { provider, providerConfig, triggerPath } = config try { - const credentialSetError = await syncCredentialSetWebhooks({ + const syncResult = await syncCredentialSetWebhooks({ workflowId, blockId: block.id, provider, @@ -540,74 +544,214 @@ export async function saveTriggerWebhooksForDeploy({ deploymentVersionId, }) - if (credentialSetError) { - return { success: false, error: credentialSetError } + if (syncResult.warnings.length > 0) { + collectedWarnings.push(...syncResult.warnings) + } + + if (syncResult.error) { + await restorePreviousSubscriptions() + return { success: false, error: syncResult.error, warnings: collectedWarnings } } } catch (error: any) { logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error) + await restorePreviousSubscriptions() return { success: false, error: { message: error?.message || 'Failed to save trigger configuration', status: 500, }, + warnings: collectedWarnings, } } } - // 5. Create webhooks for blocks that need them + // 5. Create webhooks for blocks that need them (two-phase approach for atomicity) + const createdSubscriptions: Array<{ + webhookId: string + block: BlockState + provider: string + triggerPath: string + updatedProviderConfig: Record + externalSubscriptionCreated: boolean + }> = [] + for (const block of blocksNeedingWebhook) { - const config = (block as any)._webhookConfig + const config = webhookConfigs.get(block.id) if (!config) continue const { provider, providerConfig, triggerPath } = config + const webhookId = nanoid() + const createPayload = { + id: webhookId, + path: triggerPath, + provider, + providerConfig, + } try { - const createError = await createWebhookForBlock({ + const result = await createExternalWebhookSubscription( request, - workflowId, + createPayload, workflow, userId, + requestId + ) + + createdSubscriptions.push({ + webhookId, block, provider, - providerConfig, triggerPath, - requestId, - deploymentVersionId, + updatedProviderConfig: result.updatedProviderConfig as Record, + externalSubscriptionCreated: result.externalSubscriptionCreated, }) - - if (createError) { - return { success: false, error: createError } - } } catch (error: any) { - logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error) + logger.error(`[${requestId}] Failed to create external subscription for ${block.id}`, error) + for (const sub of createdSubscriptions) { + if (sub.externalSubscriptionCreated) { + try { + await cleanupExternalWebhook( + { + id: sub.webhookId, + path: sub.triggerPath, + provider: sub.provider, + providerConfig: sub.updatedProviderConfig, + }, + workflow, + requestId + ) + } catch (cleanupError) { + logger.warn( + `[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`, + cleanupError + ) + } + } + } + await restorePreviousSubscriptions() return { success: false, error: { - message: error?.message || 'Failed to save trigger configuration', + message: error?.message || 'Failed to create external subscription', status: 500, }, } } } - // Clean up temp config - for (const block of triggerBlocks) { - ;(block as any)._webhookConfig = undefined + // Phase 2: Insert all DB records in a transaction + try { + await db.transaction(async (tx) => { + for (const sub of createdSubscriptions) { + await tx.insert(webhook).values({ + id: sub.webhookId, + workflowId, + deploymentVersionId: deploymentVersionId || null, + blockId: sub.block.id, + path: sub.triggerPath, + provider: sub.provider, + providerConfig: sub.updatedProviderConfig, + credentialSetId: + (sub.updatedProviderConfig.credentialSetId as string | undefined) || null, + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + }) + + for (const sub of createdSubscriptions) { + const pollingError = await configurePollingIfNeeded( + sub.provider, + { id: sub.webhookId, path: sub.triggerPath, providerConfig: sub.updatedProviderConfig }, + requestId + ) + if (pollingError) { + logger.error( + `[${requestId}] Polling configuration failed for ${sub.block.id}`, + pollingError + ) + for (const otherSub of createdSubscriptions) { + if (otherSub.webhookId === sub.webhookId) continue + if (otherSub.externalSubscriptionCreated) { + try { + await cleanupExternalWebhook( + { + id: otherSub.webhookId, + path: otherSub.triggerPath, + provider: otherSub.provider, + providerConfig: otherSub.updatedProviderConfig, + }, + workflow, + requestId + ) + } catch (cleanupError) { + logger.warn( + `[${requestId}] Failed to cleanup external subscription for ${otherSub.block.id}`, + cleanupError + ) + } + } + } + const otherWebhookIds = createdSubscriptions + .filter((s) => s.webhookId !== sub.webhookId) + .map((s) => s.webhookId) + if (otherWebhookIds.length > 0) { + await db.delete(webhook).where(inArray(webhook.id, otherWebhookIds)) + } + await restorePreviousSubscriptions() + return { success: false, error: pollingError } + } + } + } catch (error: any) { + logger.error(`[${requestId}] Failed to insert webhook records`, error) + for (const sub of createdSubscriptions) { + if (sub.externalSubscriptionCreated) { + try { + await cleanupExternalWebhook( + { + id: sub.webhookId, + path: sub.triggerPath, + provider: sub.provider, + providerConfig: sub.updatedProviderConfig, + }, + workflow, + requestId + ) + } catch (cleanupError) { + logger.warn( + `[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`, + cleanupError + ) + } + } + } + await restorePreviousSubscriptions() + return { + success: false, + error: { + message: error?.message || 'Failed to save webhook records', + status: 500, + }, + } } - return { success: true } + return { success: true, warnings: collectedWarnings.length > 0 ? collectedWarnings : undefined } } /** * Clean up all webhooks for a workflow during undeploy. * Removes external subscriptions and deletes webhook records from the database. + * + * @param skipExternalCleanup - If true, skip external subscription cleanup (already done elsewhere) */ export async function cleanupWebhooksForWorkflow( workflowId: string, workflow: Record, requestId: string, - deploymentVersionId?: string + deploymentVersionId?: string, + skipExternalCleanup = false ): Promise { const existingWebhooks = await db .select() @@ -626,23 +770,26 @@ export async function cleanupWebhooksForWorkflow( return } - logger.info(`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for undeploy`, { - workflowId, - deploymentVersionId, - webhookIds: existingWebhooks.map((wh) => wh.id), - }) + logger.info( + `[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for ${skipExternalCleanup ? 'DB records only' : 'undeploy'}`, + { + workflowId, + deploymentVersionId, + webhookIds: existingWebhooks.map((wh) => wh.id), + } + ) - // Clean up external subscriptions - for (const wh of existingWebhooks) { - try { - await cleanupExternalWebhook(wh, workflow, requestId) - } catch (cleanupError) { - logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError) - // Continue with other webhooks even if one fails + if (!skipExternalCleanup) { + for (const wh of existingWebhooks) { + try { + await cleanupExternalWebhook(wh, workflow, requestId) + } catch (cleanupError) { + logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError) + // Continue with other webhooks even if one fails + } } } - // Delete all webhook records await db .delete(webhook) .where( @@ -660,3 +807,55 @@ export async function cleanupWebhooksForWorkflow( : `[${requestId}] Cleaned up all webhooks for workflow ${workflowId}` ) } + +/** + * Restore external subscriptions for a previous deployment version. + * Used when activation/deployment fails after webhooks were created, + * to restore the previous version's external subscriptions. + */ +export async function restorePreviousVersionWebhooks(params: { + request: NextRequest + workflow: Record + userId: string + previousVersionId: string + requestId: string +}): Promise { + const { request, workflow, userId, previousVersionId, requestId } = params + + const previousWebhooks = await db + .select() + .from(webhook) + .where(eq(webhook.deploymentVersionId, previousVersionId)) + + if (previousWebhooks.length === 0) { + logger.debug(`[${requestId}] No previous webhooks to restore for version ${previousVersionId}`) + return + } + + logger.info( + `[${requestId}] Restoring ${previousWebhooks.length} external subscription(s) for previous version ${previousVersionId}` + ) + + for (const wh of previousWebhooks) { + try { + await createExternalWebhookSubscription( + request, + { + id: wh.id, + path: wh.path, + provider: wh.provider, + providerConfig: (wh.providerConfig as Record) || {}, + }, + workflow, + userId, + requestId + ) + logger.info(`[${requestId}] Restored external subscription for webhook ${wh.id}`) + } catch (restoreError) { + logger.error( + `[${requestId}] Failed to restore external subscription for webhook ${wh.id}`, + restoreError + ) + } + } +} diff --git a/apps/sim/lib/webhooks/gmail-polling-service.ts b/apps/sim/lib/webhooks/gmail-polling-service.ts index 0c4ce378e..7e3bcca5d 100644 --- a/apps/sim/lib/webhooks/gmail-polling-service.ts +++ b/apps/sim/lib/webhooks/gmail-polling-service.ts @@ -161,7 +161,7 @@ export async function pollGmailWebhooks() { const metadata = webhookData.providerConfig as any const credentialId: string | undefined = metadata?.credentialId const userId: string | undefined = metadata?.userId - const credentialSetId: string | undefined = metadata?.credentialSetId + const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined if (!credentialId && !userId) { logger.error(`[${requestId}] Missing credential info for webhook ${webhookId}`) @@ -697,7 +697,6 @@ async function processEmails( method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-Webhook-Secret': webhookData.secret || '', 'User-Agent': 'Sim/1.0', }, body: JSON.stringify(payload), @@ -766,17 +765,21 @@ async function markEmailAsRead(accessToken: string, messageId: string) { } async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) { - const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) - const existingConfig = (result[0]?.providerConfig as Record) || {} - await db - .update(webhook) - .set({ - providerConfig: { - ...existingConfig, - lastCheckedTimestamp: timestamp, - ...(historyId ? { historyId } : {}), - } as any, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) + try { + const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) + const existingConfig = (result[0]?.providerConfig as Record) || {} + await db + .update(webhook) + .set({ + providerConfig: { + ...existingConfig, + lastCheckedTimestamp: timestamp, + ...(historyId ? { historyId } : {}), + } as any, + updatedAt: new Date(), + }) + .where(eq(webhook.id, webhookId)) + } catch (error) { + logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error) + } } diff --git a/apps/sim/lib/webhooks/imap-polling-service.ts b/apps/sim/lib/webhooks/imap-polling-service.ts index 49185f9d9..9d664531f 100644 --- a/apps/sim/lib/webhooks/imap-polling-service.ts +++ b/apps/sim/lib/webhooks/imap-polling-service.ts @@ -645,7 +645,6 @@ async function processEmails( method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-Webhook-Secret': '', 'User-Agent': 'Sim/1.0', }, body: JSON.stringify(payload), diff --git a/apps/sim/lib/webhooks/outlook-polling-service.ts b/apps/sim/lib/webhooks/outlook-polling-service.ts index 801d22182..1f1b48e0c 100644 --- a/apps/sim/lib/webhooks/outlook-polling-service.ts +++ b/apps/sim/lib/webhooks/outlook-polling-service.ts @@ -210,7 +210,7 @@ export async function pollOutlookWebhooks() { const metadata = webhookData.providerConfig as any const credentialId: string | undefined = metadata?.credentialId const userId: string | undefined = metadata?.userId - const credentialSetId: string | undefined = metadata?.credentialSetId + const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined if (!credentialId && !userId) { logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`) @@ -607,7 +607,6 @@ async function processOutlookEmails( method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-Webhook-Secret': webhookData.secret || '', 'User-Agent': 'Sim/1.0', }, body: JSON.stringify(payload), diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index fa4e295bb..709fa05d2 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -7,16 +7,27 @@ import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils' import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags' +import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' import { preprocessExecution } from '@/lib/execution/preprocessing' import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils' import { handleSlackChallenge, handleWhatsAppVerification, + validateCirclebackSignature, + validateFirefliesSignature, + validateGitHubSignature, + validateJiraSignature, + validateLinearSignature, validateMicrosoftTeamsSignature, + validateTwilioSignature, + validateTypeformSignature, verifyProviderWebhook, } from '@/lib/webhooks/utils.server' import { executeWebhookJob } from '@/background/webhook-execution' import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' +import { isGitHubEventMatch } from '@/triggers/github/utils' +import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils' +import { isJiraEventMatch } from '@/triggers/jira/utils' const logger = createLogger('WebhookProcessor') @@ -451,7 +462,6 @@ export async function verifyProviderAuth( // Step 1: Fetch and decrypt environment variables for signature verification let decryptedEnvVars: Record = {} try { - const { getEffectiveDecryptedEnv } = await import('@/lib/environment/utils') decryptedEnvVars = await getEffectiveDecryptedEnv( foundWorkflow.userId, foundWorkflow.workspaceId @@ -553,9 +563,6 @@ export async function verifyProviderAuth( } const fullUrl = getExternalUrl(request) - - const { validateTwilioSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = await validateTwilioSignature(authToken, signature, fullUrl, params) if (!isValidSignature) { @@ -583,8 +590,6 @@ export async function verifyProviderAuth( return new NextResponse('Unauthorized - Missing Typeform signature', { status: 401 }) } - const { validateTypeformSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = validateTypeformSignature(secret, signature, rawBody) if (!isValidSignature) { @@ -610,8 +615,6 @@ export async function verifyProviderAuth( return new NextResponse('Unauthorized - Missing Linear signature', { status: 401 }) } - const { validateLinearSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = validateLinearSignature(secret, signature, rawBody) if (!isValidSignature) { @@ -637,8 +640,6 @@ export async function verifyProviderAuth( return new NextResponse('Unauthorized - Missing Circleback signature', { status: 401 }) } - const { validateCirclebackSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = validateCirclebackSignature(secret, signature, rawBody) if (!isValidSignature) { @@ -664,8 +665,6 @@ export async function verifyProviderAuth( return new NextResponse('Unauthorized - Missing Jira signature', { status: 401 }) } - const { validateJiraSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = validateJiraSignature(secret, signature, rawBody) if (!isValidSignature) { @@ -694,8 +693,6 @@ export async function verifyProviderAuth( return new NextResponse('Unauthorized - Missing GitHub signature', { status: 401 }) } - const { validateGitHubSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = validateGitHubSignature(secret, signature, rawBody) if (!isValidSignature) { @@ -724,8 +721,6 @@ export async function verifyProviderAuth( return new NextResponse('Unauthorized - Missing Fireflies signature', { status: 401 }) } - const { validateFirefliesSignature } = await import('@/lib/webhooks/utils.server') - const isValidSignature = validateFirefliesSignature(secret, signature, rawBody) if (!isValidSignature) { @@ -860,8 +855,6 @@ export async function queueWebhookExecution( const eventType = request.headers.get('x-github-event') const action = body.action - const { isGitHubEventMatch } = await import('@/triggers/github/utils') - if (!isGitHubEventMatch(triggerId, eventType || '', action, body)) { logger.debug( `[${options.requestId}] GitHub event mismatch for trigger ${triggerId}. Event: ${eventType}, Action: ${action}. Skipping execution.`, @@ -890,8 +883,6 @@ export async function queueWebhookExecution( if (triggerId && triggerId !== 'jira_webhook') { const webhookEvent = body.webhookEvent as string | undefined - const { isJiraEventMatch } = await import('@/triggers/jira/utils') - if (!isJiraEventMatch(triggerId, webhookEvent || '', body)) { logger.debug( `[${options.requestId}] Jira event mismatch for trigger ${triggerId}. Event: ${webhookEvent}. Skipping execution.`, @@ -921,8 +912,6 @@ export async function queueWebhookExecution( const subscriptionType = firstEvent?.subscriptionType as string | undefined - const { isHubSpotContactEventMatch } = await import('@/triggers/hubspot/utils') - if (!isHubSpotContactEventMatch(triggerId, subscriptionType || '')) { logger.debug( `[${options.requestId}] HubSpot event mismatch for trigger ${triggerId}. Event: ${subscriptionType}. Skipping execution.`, @@ -974,7 +963,8 @@ export async function queueWebhookExecution( // Note: Each webhook now has its own credentialId (credential sets are fanned out at save time) const providerConfig = (foundWebhook.providerConfig as Record) || {} const credentialId = providerConfig.credentialId as string | undefined - const credentialSetId = providerConfig.credentialSetId as string | undefined + // credentialSetId is a direct field on webhook table, not in providerConfig + const credentialSetId = foundWebhook.credentialSetId as string | undefined // Verify billing for credential sets if (credentialSetId) { diff --git a/apps/sim/lib/webhooks/provider-subscriptions.ts b/apps/sim/lib/webhooks/provider-subscriptions.ts index 6461eb1e2..a3e049f76 100644 --- a/apps/sim/lib/webhooks/provider-subscriptions.ts +++ b/apps/sim/lib/webhooks/provider-subscriptions.ts @@ -30,11 +30,11 @@ export async function createTeamsSubscription( webhook: any, workflow: any, requestId: string -): Promise { +): Promise { const config = getProviderConfig(webhook) if (config.triggerId !== 'microsoftteams_chat_subscription') { - return + return undefined } const credentialId = config.credentialId as string | undefined @@ -77,7 +77,7 @@ export async function createTeamsSubscription( teamsLogger.info( `[${requestId}] Teams subscription ${existingSubscriptionId} already exists for webhook ${webhook.id}` ) - return + return existingSubscriptionId } } catch { teamsLogger.debug(`[${requestId}] Existing subscription check failed, will create new one`) @@ -140,6 +140,7 @@ export async function createTeamsSubscription( teamsLogger.info( `[${requestId}] Successfully created Teams subscription ${payload.id} for webhook ${webhook.id}` ) + return payload.id as string } catch (error: any) { if ( error instanceof Error && @@ -1600,9 +1601,11 @@ export async function createExternalWebhookSubscription( externalSubscriptionCreated = true } } else if (provider === 'microsoft-teams') { - await createTeamsSubscription(request, webhookData, workflow, requestId) - externalSubscriptionCreated = - (providerConfig.triggerId as string | undefined) === 'microsoftteams_chat_subscription' + const subscriptionId = await createTeamsSubscription(request, webhookData, workflow, requestId) + if (subscriptionId) { + updatedProviderConfig = { ...updatedProviderConfig, externalSubscriptionId: subscriptionId } + externalSubscriptionCreated = true + } } else if (provider === 'telegram') { await createTelegramWebhook(request, webhookData, requestId) externalSubscriptionCreated = true diff --git a/apps/sim/lib/webhooks/rss-polling-service.ts b/apps/sim/lib/webhooks/rss-polling-service.ts index 1b4e56993..ce282ef0d 100644 --- a/apps/sim/lib/webhooks/rss-polling-service.ts +++ b/apps/sim/lib/webhooks/rss-polling-service.ts @@ -379,7 +379,6 @@ async function processRssItems( method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-Webhook-Secret': webhookData.secret || '', 'User-Agent': 'Sim/1.0', }, body: JSON.stringify(payload), diff --git a/apps/sim/lib/webhooks/utils.server.ts b/apps/sim/lib/webhooks/utils.server.ts index fd0eb12a0..368b36f9f 100644 --- a/apps/sim/lib/webhooks/utils.server.ts +++ b/apps/sim/lib/webhooks/utils.server.ts @@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion } from '@sim/db' import { account, webhook } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, isNull, or } from 'drizzle-orm' +import { nanoid } from 'nanoid' import { type NextRequest, NextResponse } from 'next/server' import { type SecureFetchResponse, @@ -9,7 +10,11 @@ import { validateUrlWithDNS, } from '@/lib/core/security/input-validation' import type { DbOrTx } from '@/lib/db/types' -import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils' +import { getProviderIdFromServiceId } from '@/lib/oauth' +import { + getCredentialsForCredentialSet, + refreshAccessTokenIfNeeded, +} from '@/app/api/auth/oauth/utils' const logger = createLogger('WebhookUtils') @@ -1388,21 +1393,6 @@ export function verifyProviderWebhook( case 'stripe': break case 'gmail': - if (providerConfig.secret) { - const secretHeader = request.headers.get('X-Webhook-Secret') - if (!secretHeader || secretHeader.length !== providerConfig.secret.length) { - logger.warn(`[${requestId}] Invalid Gmail webhook secret`) - return new NextResponse('Unauthorized', { status: 401 }) - } - let result = 0 - for (let i = 0; i < secretHeader.length; i++) { - result |= secretHeader.charCodeAt(i) ^ providerConfig.secret.charCodeAt(i) - } - if (result !== 0) { - logger.warn(`[${requestId}] Invalid Gmail webhook secret`) - return new NextResponse('Unauthorized', { status: 401 }) - } - } break case 'telegram': { // Check User-Agent to ensure it's not blocked by middleware @@ -1946,6 +1936,10 @@ export interface CredentialSetWebhookSyncResult { created: number updated: number deleted: number + failed: Array<{ + credentialId: string + error: string + }> } /** @@ -1997,9 +1991,6 @@ export async function syncWebhooksForCredentialSet(params: { `[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}` ) - const { getCredentialsForCredentialSet } = await import('@/app/api/auth/oauth/utils') - const { nanoid } = await import('nanoid') - // Polling providers get unique paths per credential (for independent state) // External webhook providers share the same path (external service sends to one URL) const pollingProviders = ['gmail', 'outlook', 'rss', 'imap'] @@ -2011,7 +2002,7 @@ export async function syncWebhooksForCredentialSet(params: { syncLogger.warn( `[${requestId}] No credentials found in credential set ${credentialSetId} for provider ${oauthProviderId}` ) - return { webhooks: [], created: 0, updated: 0, deleted: 0 } + return { webhooks: [], created: 0, updated: 0, deleted: 0, failed: [] } } syncLogger.info( @@ -2033,10 +2024,9 @@ export async function syncWebhooksForCredentialSet(params: { ) // Filter to only webhooks belonging to this credential set - const credentialSetWebhooks = existingWebhooks.filter((wh) => { - const config = wh.providerConfig as Record - return config?.credentialSetId === credentialSetId - }) + const credentialSetWebhooks = existingWebhooks.filter( + (wh) => wh.credentialSetId === credentialSetId + ) syncLogger.info( `[${requestId}] Found ${credentialSetWebhooks.length} existing webhooks for credential set` @@ -2058,103 +2048,128 @@ export async function syncWebhooksForCredentialSet(params: { created: 0, updated: 0, deleted: 0, + failed: [], } // Process each credential in the set for (const cred of credentials) { - const existingWebhook = existingByCredentialId.get(cred.credentialId) + try { + const existingWebhook = existingByCredentialId.get(cred.credentialId) - if (existingWebhook) { - // Update existing webhook - preserve state fields - const existingConfig = existingWebhook.providerConfig as Record + if (existingWebhook) { + // Update existing webhook - preserve state fields + const existingConfig = existingWebhook.providerConfig as Record - const updatedConfig = { - ...providerConfig, - basePath, // Store basePath for reliable reconstruction during membership sync - credentialId: cred.credentialId, - credentialSetId: credentialSetId, - // Preserve state fields from existing config - historyId: existingConfig.historyId, - lastCheckedTimestamp: existingConfig.lastCheckedTimestamp, - setupCompleted: existingConfig.setupCompleted, - externalId: existingConfig.externalId, - userId: cred.userId, - } + const updatedConfig = { + ...providerConfig, + basePath, // Store basePath for reliable reconstruction during membership sync + credentialId: cred.credentialId, + credentialSetId: credentialSetId, + // Preserve state fields from existing config + historyId: existingConfig?.historyId, + lastCheckedTimestamp: existingConfig?.lastCheckedTimestamp, + setupCompleted: existingConfig?.setupCompleted, + externalId: existingConfig?.externalId, + userId: cred.userId, + } - await dbCtx - .update(webhook) - .set({ - ...(deploymentVersionId ? { deploymentVersionId } : {}), - providerConfig: updatedConfig, + await dbCtx + .update(webhook) + .set({ + ...(deploymentVersionId ? { deploymentVersionId } : {}), + providerConfig: updatedConfig, + isActive: true, + updatedAt: new Date(), + }) + .where(eq(webhook.id, existingWebhook.id)) + + result.webhooks.push({ + id: existingWebhook.id, + credentialId: cred.credentialId, + isNew: false, + }) + result.updated++ + + syncLogger.debug( + `[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}` + ) + } else { + // Create new webhook for this credential + const webhookId = nanoid() + const webhookPath = useUniquePaths + ? `${basePath}-${cred.credentialId.slice(0, 8)}` + : basePath + + const newConfig = { + ...providerConfig, + basePath, // Store basePath for reliable reconstruction during membership sync + credentialId: cred.credentialId, + credentialSetId: credentialSetId, + userId: cred.userId, + } + + await dbCtx.insert(webhook).values({ + id: webhookId, + workflowId, + blockId, + path: webhookPath, + provider, + providerConfig: newConfig, + credentialSetId, // Indexed column for efficient credential set queries isActive: true, + ...(deploymentVersionId ? { deploymentVersionId } : {}), + createdAt: new Date(), updatedAt: new Date(), }) - .where(eq(webhook.id, existingWebhook.id)) - result.webhooks.push({ - id: existingWebhook.id, - credentialId: cred.credentialId, - isNew: false, - }) - result.updated++ + result.webhooks.push({ + id: webhookId, + credentialId: cred.credentialId, + isNew: true, + }) + result.created++ - syncLogger.debug( - `[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}` - ) - } else { - // Create new webhook for this credential - const webhookId = nanoid() - const webhookPath = useUniquePaths ? `${basePath}-${cred.credentialId.slice(0, 8)}` : basePath - - const newConfig = { - ...providerConfig, - basePath, // Store basePath for reliable reconstruction during membership sync - credentialId: cred.credentialId, - credentialSetId: credentialSetId, - userId: cred.userId, + syncLogger.debug( + `[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}` + ) } - - await dbCtx.insert(webhook).values({ - id: webhookId, - workflowId, - blockId, - path: webhookPath, - provider, - providerConfig: newConfig, - credentialSetId, // Indexed column for efficient credential set queries - isActive: true, - ...(deploymentVersionId ? { deploymentVersionId } : {}), - createdAt: new Date(), - updatedAt: new Date(), - }) - - result.webhooks.push({ - id: webhookId, - credentialId: cred.credentialId, - isNew: true, - }) - result.created++ - - syncLogger.debug( - `[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}` + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + syncLogger.error( + `[${requestId}] Failed to sync webhook for credential ${cred.credentialId}: ${errorMessage}` ) + result.failed.push({ + credentialId: cred.credentialId, + error: errorMessage, + }) } } // Delete webhooks for credentials no longer in the set for (const [credentialId, existingWebhook] of existingByCredentialId) { if (!credentialIdsInSet.has(credentialId)) { - await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id)) - result.deleted++ + try { + await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id)) + result.deleted++ - syncLogger.debug( - `[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}` - ) + syncLogger.debug( + `[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}` + ) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + syncLogger.error( + `[${requestId}] Failed to delete webhook ${existingWebhook.id} for credential ${credentialId}: ${errorMessage}` + ) + result.failed.push({ + credentialId, + error: `Failed to delete: ${errorMessage}`, + }) + } } } syncLogger.info( - `[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted` + `[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted, ${result.failed.length} failed` ) return result @@ -2175,8 +2190,6 @@ export async function syncAllWebhooksForCredentialSet( const syncLogger = createLogger('CredentialSetMembershipSync') syncLogger.info(`[${requestId}] Syncing all webhooks for credential set ${credentialSetId}`) - const { getProviderIdFromServiceId } = await import('@/lib/oauth') - // Find all webhooks that use this credential set using the indexed column const webhooksForSet = await dbCtx .select({ webhook }) diff --git a/apps/sim/lib/webhooks/utils.ts b/apps/sim/lib/webhooks/utils.ts index 916045b34..d411f6b84 100644 --- a/apps/sim/lib/webhooks/utils.ts +++ b/apps/sim/lib/webhooks/utils.ts @@ -1,5 +1,5 @@ /** - * Pure utility functions for TwiML processing + * Pure utility functions for webhook processing * This file has NO server-side dependencies to ensure it can be safely imported in client-side code */ @@ -18,3 +18,19 @@ export function convertSquareBracketsToTwiML(twiml: string | undefined): string // Replace [Tag] with and [/Tag] with return twiml.replace(/\[(\/?[^\]]+)\]/g, '<$1>') } + +/** + * Merges fields from source into target, but only if they don't exist in the base config. + * Used to preserve system-managed fields while respecting user-provided values. + */ +export function mergeNonUserFields( + target: Record, + source: Record, + userProvided: Record +): void { + for (const [key, value] of Object.entries(source)) { + if (!(key in userProvided)) { + target[key] = value + } + } +} diff --git a/apps/sim/lib/workflows/schedules/deploy.test.ts b/apps/sim/lib/workflows/schedules/deploy.test.ts index a18bd5adf..b5039c031 100644 --- a/apps/sim/lib/workflows/schedules/deploy.test.ts +++ b/apps/sim/lib/workflows/schedules/deploy.test.ts @@ -17,6 +17,9 @@ const { mockValidateCronExpression, mockGetScheduleTimeValues, mockRandomUUID, + mockTransaction, + mockSelect, + mockFrom, } = vi.hoisted(() => ({ mockInsert: vi.fn(), mockDelete: vi.fn(), @@ -28,20 +31,27 @@ const { mockValidateCronExpression: vi.fn(), mockGetScheduleTimeValues: vi.fn(), mockRandomUUID: vi.fn(), + mockTransaction: vi.fn(), + mockSelect: vi.fn(), + mockFrom: vi.fn(), })) vi.mock('@sim/db', () => ({ - db: {}, + db: { + transaction: mockTransaction, + }, workflowSchedule: { workflowId: 'workflow_id', blockId: 'block_id', deploymentVersionId: 'deployment_version_id', + id: 'id', }, })) vi.mock('drizzle-orm', () => ({ eq: vi.fn((...args) => ({ type: 'eq', args })), and: vi.fn((...args) => ({ type: 'and', args })), + inArray: vi.fn((...args) => ({ type: 'inArray', args })), sql: vi.fn((strings, ...values) => ({ type: 'sql', strings, values })), })) @@ -100,6 +110,20 @@ describe('Schedule Deploy Utilities', () => { // Setup mock chain for delete mockWhere.mockResolvedValue({}) mockDelete.mockReturnValue({ where: mockWhere }) + + // Setup mock chain for select + mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue([]) }) + mockSelect.mockReturnValue({ from: mockFrom }) + + // Setup transaction mock to execute callback with mock tx + mockTransaction.mockImplementation(async (callback) => { + const mockTx = { + insert: mockInsert, + delete: mockDelete, + select: mockSelect, + } + return callback(mockTx) + }) }) afterEach(() => { @@ -135,6 +159,19 @@ describe('Schedule Deploy Utilities', () => { const result = findScheduleBlocks({}) expect(result).toHaveLength(0) }) + + it('should exclude disabled schedule blocks', () => { + const blocks: Record = { + 'block-1': { id: 'block-1', type: 'schedule', enabled: true, subBlocks: {} } as BlockState, + 'block-2': { id: 'block-2', type: 'schedule', enabled: false, subBlocks: {} } as BlockState, + 'block-3': { id: 'block-3', type: 'schedule', subBlocks: {} } as BlockState, // enabled undefined = enabled + } + + const result = findScheduleBlocks(blocks) + + expect(result).toHaveLength(2) + expect(result.map((b) => b.id)).toEqual(['block-1', 'block-3']) + }) }) describe('validateScheduleBlock', () => { @@ -671,20 +708,24 @@ describe('Schedule Deploy Utilities', () => { }) describe('createSchedulesForDeploy', () => { + const setupMockTransaction = ( + existingSchedules: Array<{ id: string; blockId: string }> = [] + ) => { + mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue(existingSchedules) }) + mockSelect.mockReturnValue({ from: mockFrom }) + } + it('should return success with no schedule blocks', async () => { const blocks: Record = { 'block-1': { id: 'block-1', type: 'agent', subBlocks: {} } as BlockState, } - const mockTx = { - insert: mockInsert, - delete: mockDelete, - } + setupMockTransaction() - const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any) + const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any) expect(result.success).toBe(true) - expect(mockInsert).not.toHaveBeenCalled() + expect(mockTransaction).not.toHaveBeenCalled() }) it('should create schedule for valid schedule block', async () => { @@ -700,17 +741,15 @@ describe('Schedule Deploy Utilities', () => { } as BlockState, } - const mockTx = { - insert: mockInsert, - delete: mockDelete, - } + setupMockTransaction() - const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any) + const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any) expect(result.success).toBe(true) expect(result.scheduleId).toBe('test-uuid') expect(result.cronExpression).toBe('0 9 * * *') expect(result.nextRunAt).toEqual(new Date('2025-04-15T09:00:00Z')) + expect(mockTransaction).toHaveBeenCalled() expect(mockInsert).toHaveBeenCalled() expect(mockOnConflictDoUpdate).toHaveBeenCalled() }) @@ -727,16 +766,13 @@ describe('Schedule Deploy Utilities', () => { } as BlockState, } - const mockTx = { - insert: mockInsert, - delete: mockDelete, - } + setupMockTransaction() - const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any) + const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any) expect(result.success).toBe(false) expect(result.error).toBe('Time is required for daily schedules') - expect(mockInsert).not.toHaveBeenCalled() + expect(mockTransaction).not.toHaveBeenCalled() }) it('should use onConflictDoUpdate for existing schedules', async () => { @@ -752,12 +788,9 @@ describe('Schedule Deploy Utilities', () => { } as BlockState, } - const mockTx = { - insert: mockInsert, - delete: mockDelete, - } + setupMockTransaction() - await createSchedulesForDeploy('workflow-1', blocks, mockTx as any) + await createSchedulesForDeploy('workflow-1', blocks, {} as any) expect(mockOnConflictDoUpdate).toHaveBeenCalledWith({ target: expect.any(Array), @@ -769,6 +802,27 @@ describe('Schedule Deploy Utilities', () => { }), }) }) + + it('should rollback on database error', async () => { + const blocks: Record = { + 'block-1': { + id: 'block-1', + type: 'schedule', + subBlocks: { + scheduleType: { value: 'daily' }, + dailyTime: { value: '09:00' }, + timezone: { value: 'UTC' }, + }, + } as BlockState, + } + + mockTransaction.mockRejectedValueOnce(new Error('Database error')) + + const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any) + + expect(result.success).toBe(false) + expect(result.error).toBe('Database error') + }) }) describe('deleteSchedulesForWorkflow', () => { diff --git a/apps/sim/lib/workflows/schedules/deploy.ts b/apps/sim/lib/workflows/schedules/deploy.ts index 659affda9..9344b018e 100644 --- a/apps/sim/lib/workflows/schedules/deploy.ts +++ b/apps/sim/lib/workflows/schedules/deploy.ts @@ -1,6 +1,6 @@ import { db, workflowSchedule } from '@sim/db' import { createLogger } from '@sim/logger' -import { and, eq } from 'drizzle-orm' +import { and, eq, inArray } from 'drizzle-orm' import type { DbOrTx } from '@/lib/db/types' import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy' import type { BlockState } from '@/lib/workflows/schedules/utils' @@ -21,13 +21,13 @@ export interface ScheduleDeployResult { } /** - * Create or update schedule records for a workflow during deployment - * This should be called within a database transaction + * Create or update schedule records for a workflow during deployment. + * Uses a transaction to ensure atomicity - all schedules are created or none are. */ export async function createSchedulesForDeploy( workflowId: string, blocks: Record, - tx: DbOrTx, + _tx: DbOrTx, deploymentVersionId?: string ): Promise { const scheduleBlocks = findScheduleBlocks(blocks) @@ -37,16 +37,16 @@ export async function createSchedulesForDeploy( return { success: true } } - let lastScheduleInfo: { - scheduleId: string - cronExpression?: string - nextRunAt?: Date - timezone?: string - } | null = null + // Phase 1: Validate ALL blocks before making any DB changes + const validatedBlocks: Array<{ + blockId: string + cronExpression: string + nextRunAt: Date + timezone: string + }> = [] for (const block of scheduleBlocks) { const blockId = block.id as string - const validation = validateScheduleBlock(block) if (!validation.isValid) { return { @@ -54,62 +54,112 @@ export async function createSchedulesForDeploy( error: validation.error, } } - - const { cronExpression, nextRunAt, timezone } = validation - - const scheduleId = crypto.randomUUID() - const now = new Date() - - const values = { - id: scheduleId, - workflowId, - deploymentVersionId: deploymentVersionId || null, + validatedBlocks.push({ blockId, - cronExpression: cronExpression!, - triggerType: 'schedule', - createdAt: now, - updatedAt: now, - nextRunAt: nextRunAt!, - timezone: timezone!, - status: 'active', - failedCount: 0, - } - - const setValues = { - blockId, - cronExpression: cronExpression!, - ...(deploymentVersionId ? { deploymentVersionId } : {}), - updatedAt: now, - nextRunAt: nextRunAt!, - timezone: timezone!, - status: 'active', - failedCount: 0, - } - - await tx - .insert(workflowSchedule) - .values(values) - .onConflictDoUpdate({ - target: [ - workflowSchedule.workflowId, - workflowSchedule.blockId, - workflowSchedule.deploymentVersionId, - ], - set: setValues, - }) - - logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, { - scheduleId: values.id, - cronExpression, - nextRunAt: nextRunAt?.toISOString(), + cronExpression: validation.cronExpression!, + nextRunAt: validation.nextRunAt!, + timezone: validation.timezone!, }) + } - lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone } + // Phase 2: All validations passed - now do DB operations in a transaction + let lastScheduleInfo: { + scheduleId: string + cronExpression?: string + nextRunAt?: Date + timezone?: string + } | null = null + + try { + await db.transaction(async (tx) => { + const currentBlockIds = new Set(validatedBlocks.map((b) => b.blockId)) + + const existingSchedules = await tx + .select({ id: workflowSchedule.id, blockId: workflowSchedule.blockId }) + .from(workflowSchedule) + .where( + deploymentVersionId + ? and( + eq(workflowSchedule.workflowId, workflowId), + eq(workflowSchedule.deploymentVersionId, deploymentVersionId) + ) + : eq(workflowSchedule.workflowId, workflowId) + ) + + const orphanedScheduleIds = existingSchedules + .filter((s) => s.blockId && !currentBlockIds.has(s.blockId)) + .map((s) => s.id) + + if (orphanedScheduleIds.length > 0) { + logger.info( + `Deleting ${orphanedScheduleIds.length} orphaned schedule(s) for workflow ${workflowId}` + ) + await tx.delete(workflowSchedule).where(inArray(workflowSchedule.id, orphanedScheduleIds)) + } + + for (const validated of validatedBlocks) { + const { blockId, cronExpression, nextRunAt, timezone } = validated + const scheduleId = crypto.randomUUID() + const now = new Date() + + const values = { + id: scheduleId, + workflowId, + deploymentVersionId: deploymentVersionId || null, + blockId, + cronExpression, + triggerType: 'schedule', + createdAt: now, + updatedAt: now, + nextRunAt, + timezone, + status: 'active', + failedCount: 0, + } + + const setValues = { + blockId, + cronExpression, + ...(deploymentVersionId ? { deploymentVersionId } : {}), + updatedAt: now, + nextRunAt, + timezone, + status: 'active', + failedCount: 0, + } + + await tx + .insert(workflowSchedule) + .values(values) + .onConflictDoUpdate({ + target: [ + workflowSchedule.workflowId, + workflowSchedule.blockId, + workflowSchedule.deploymentVersionId, + ], + set: setValues, + }) + + logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, { + scheduleId: values.id, + cronExpression, + nextRunAt: nextRunAt?.toISOString(), + }) + + lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone } + } + }) + } catch (error) { + logger.error(`Failed to create schedules for workflow ${workflowId}`, error) + return { + success: false, + error: error instanceof Error ? error.message : 'Failed to create schedules', + } } return { success: true, - ...lastScheduleInfo, + ...(lastScheduleInfo ?? {}), } } @@ -145,8 +195,25 @@ export async function cleanupDeploymentVersion(params: { workflow: Record requestId: string deploymentVersionId: string + /** + * If true, skip external subscription cleanup (already done by saveTriggerWebhooksForDeploy). + * Only deletes DB records. + */ + skipExternalCleanup?: boolean }): Promise { - const { workflowId, workflow, requestId, deploymentVersionId } = params - await cleanupWebhooksForWorkflow(workflowId, workflow, requestId, deploymentVersionId) + const { + workflowId, + workflow, + requestId, + deploymentVersionId, + skipExternalCleanup = false, + } = params + await cleanupWebhooksForWorkflow( + workflowId, + workflow, + requestId, + deploymentVersionId, + skipExternalCleanup + ) await deleteSchedulesForWorkflow(workflowId, db, deploymentVersionId) } diff --git a/apps/sim/lib/workflows/schedules/validation.ts b/apps/sim/lib/workflows/schedules/validation.ts index 2ad436983..2c8356c0b 100644 --- a/apps/sim/lib/workflows/schedules/validation.ts +++ b/apps/sim/lib/workflows/schedules/validation.ts @@ -98,9 +98,12 @@ function getMissingConfigError(scheduleType: string): string { /** * Find schedule blocks in a workflow's blocks + * Only returns enabled schedule blocks (disabled blocks are skipped) */ export function findScheduleBlocks(blocks: Record): BlockState[] { - return Object.values(blocks).filter((block) => block.type === 'schedule') + return Object.values(blocks).filter( + (block) => block.type === 'schedule' && block.enabled !== false + ) } /** diff --git a/apps/sim/stores/workflows/workflow/store.ts b/apps/sim/stores/workflows/workflow/store.ts index 9c40fffc4..3fc3438cf 100644 --- a/apps/sim/stores/workflows/workflow/store.ts +++ b/apps/sim/stores/workflows/workflow/store.ts @@ -4,7 +4,6 @@ import { create } from 'zustand' import { devtools } from 'zustand/middleware' import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants' import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs' -import { TriggerUtils } from '@/lib/workflows/triggers/triggers' import { getBlock } from '@/blocks' import type { SubBlockConfig } from '@/blocks/types' import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants' @@ -1171,93 +1170,6 @@ export const useWorkflowStore = create()( // Note: Socket.IO handles real-time sync automatically }, - toggleBlockTriggerMode: (id: string) => { - const block = get().blocks[id] - if (!block) return - - const newTriggerMode = !block.triggerMode - - // When switching TO trigger mode, check if block is inside a subflow - if (newTriggerMode && TriggerUtils.isBlockInSubflow(id, get().blocks)) { - logger.warn('Cannot enable trigger mode for block inside loop or parallel subflow', { - blockId: id, - blockType: block.type, - }) - return - } - - // When switching TO trigger mode, remove all incoming connections - let filteredEdges = [...get().edges] - if (newTriggerMode) { - // Remove edges where this block is the target - filteredEdges = filteredEdges.filter((edge) => edge.target !== id) - logger.info( - `Removed ${get().edges.length - filteredEdges.length} incoming connections for trigger mode`, - { - blockId: id, - blockType: block.type, - } - ) - } - - const newState = { - blocks: { - ...get().blocks, - [id]: { - ...block, - triggerMode: newTriggerMode, - }, - }, - edges: filteredEdges, - loops: { ...get().loops }, - parallels: { ...get().parallels }, - } - - set(newState) - get().updateLastSaved() - - // Handle webhook enable/disable when toggling trigger mode - const handleWebhookToggle = async () => { - try { - const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId - if (!activeWorkflowId) return - - // Check if there's a webhook for this block - const response = await fetch( - `/api/webhooks?workflowId=${activeWorkflowId}&blockId=${id}` - ) - if (response.ok) { - const data = await response.json() - if (data.webhooks && data.webhooks.length > 0) { - const webhook = data.webhooks[0].webhook - - // Update webhook's isActive status based on trigger mode - const updateResponse = await fetch(`/api/webhooks/${webhook.id}`, { - method: 'PATCH', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - isActive: newTriggerMode, - }), - }) - - if (!updateResponse.ok) { - logger.error('Failed to update webhook status') - } - } - } - } catch (error) { - logger.error('Error toggling webhook status:', error) - } - } - - // Handle webhook toggle asynchronously - handleWebhookToggle() - - // Note: Socket.IO handles real-time sync automatically - }, - // Parallel block methods implementation updateParallelCount: (parallelId: string, count: number) => { const block = get().blocks[parallelId] diff --git a/apps/sim/stores/workflows/workflow/types.ts b/apps/sim/stores/workflows/workflow/types.ts index c5d99e17a..e978999a8 100644 --- a/apps/sim/stores/workflows/workflow/types.ts +++ b/apps/sim/stores/workflows/workflow/types.ts @@ -241,7 +241,6 @@ export interface WorkflowActions { setNeedsRedeploymentFlag: (needsRedeployment: boolean) => void revertToDeployedState: (deployedState: WorkflowState) => void toggleBlockAdvancedMode: (id: string) => void - toggleBlockTriggerMode: (id: string) => void setDragStartPosition: (position: DragStartPosition | null) => void getDragStartPosition: () => DragStartPosition | null getWorkflowState: () => WorkflowState