Compare commits

..

5 Commits

Author SHA1 Message Date
Siddharth Ganesan
e4835817ee Reset 2026-01-24 02:16:52 -08:00
Siddharth Ganesan
8a0156a578 Fix lint 2026-01-24 02:15:09 -08:00
Siddharth Ganesan
ad7fb6e575 Fix hitl 2026-01-24 02:14:46 -08:00
Vikhyath Mondreti
12100e6881 improvement(webhooks): remove dead code (#2965)
* fix(webhooks): subscription recreation path

* improvement(webhooks): remove dead code

* fix tests

* address bugbot comments

* fix restoration edge case

* fix more edge cases

* address bugbot comments

* fix gmail polling

* add warnings for UI indication for credential sets
2026-01-23 23:18:20 -08:00
Siddharth Ganesan
23294683e1 fix(copilot): mask credentials fix (#2963)
* Fix copilot masking

* Clean up

* Lint
2026-01-23 19:34:55 -08:00
26 changed files with 1149 additions and 822 deletions

View File

@@ -640,6 +640,7 @@ export interface AdminDeployResult {
isDeployed: boolean
version: number
deployedAt: string
warnings?: string[]
}
export interface AdminUndeployResult {

View File

@@ -1,14 +1,23 @@
import { db, workflow } from '@sim/db'
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { generateRequestId } from '@/lib/core/utils/request'
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import {
cleanupWebhooksForWorkflow,
restorePreviousVersionWebhooks,
saveTriggerWebhooksForDeploy,
} from '@/lib/webhooks/deploy'
import {
deployWorkflow,
loadWorkflowFromNormalizedTables,
undeployWorkflow,
} from '@/lib/workflows/persistence/utils'
import { createSchedulesForDeploy, validateWorkflowSchedules } from '@/lib/workflows/schedules'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
import {
badRequestResponse,
@@ -28,10 +37,11 @@ interface RouteParams {
export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
const { id: workflowId } = await context.params
const requestId = generateRequestId()
try {
const [workflowRecord] = await db
.select({ id: workflow.id, name: workflow.name })
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
@@ -50,6 +60,18 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployResult = await deployWorkflow({
workflowId,
deployedBy: ADMIN_ACTOR_ID,
@@ -65,6 +87,32 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return internalErrorResponse('Failed to resolve deployment version')
}
const workflowData = workflowRecord as Record<string, unknown>
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId,
workflow: workflowData,
userId: workflowRecord.userId,
blocks: normalizedData.blocks,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
previousVersionId,
})
if (!triggerSaveResult.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
})
await undeployWorkflow({ workflowId })
return internalErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
)
}
const scheduleResult = await createSchedulesForDeploy(
workflowId,
normalizedData.blocks,
@@ -72,15 +120,58 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
deployResult.deploymentVersionId
)
if (!scheduleResult.success) {
logger.warn(`Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`)
logger.error(
`[${requestId}] Admin API: Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`
)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
await undeployWorkflow({ workflowId })
return internalErrorResponse(scheduleResult.error || 'Failed to create schedule')
}
logger.info(`Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`)
if (previousVersionId && previousVersionId !== deployResult.deploymentVersionId) {
try {
logger.info(`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId}`)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
} catch (cleanupError) {
logger.error(
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
logger.info(
`[${requestId}] Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`
)
// Sync MCP tools with the latest parameter schema
await syncMcpToolsForWorkflow({ workflowId, requestId, context: 'deploy' })
const response: AdminDeployResult = {
isDeployed: true,
version: deployResult.version!,
deployedAt: deployResult.deployedAt!.toISOString(),
warnings: triggerSaveResult.warnings,
}
return singleResponse(response)
@@ -105,7 +196,6 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
return notFoundResponse('Workflow')
}
// Clean up external webhook subscriptions before undeploying
await cleanupWebhooksForWorkflow(
workflowId,
workflowRecord as Record<string, unknown>,
@@ -117,6 +207,8 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
return internalErrorResponse(result.error || 'Failed to undeploy workflow')
}
await removeMcpToolsForWorkflow(workflowId, requestId)
logger.info(`Admin API: Undeployed workflow ${workflowId}`)
const response: AdminUndeployResult = {

View File

@@ -1,7 +1,15 @@
import { db, workflow } from '@sim/db'
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
import {
badRequestResponse,
@@ -9,6 +17,7 @@ import {
notFoundResponse,
singleResponse,
} from '@/app/api/v1/admin/responses'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('AdminWorkflowActivateVersionAPI')
@@ -18,11 +27,12 @@ interface RouteParams {
}
export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
const requestId = generateRequestId()
const { id: workflowId, versionId } = await context.params
try {
const [workflowRecord] = await db
.select({ id: workflow.id })
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
@@ -36,23 +46,161 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return badRequestResponse('Invalid version number')
}
const [versionRow] = await db
.select({
id: workflowDeploymentVersion.id,
state: workflowDeploymentVersion.state,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.limit(1)
if (!versionRow?.state) {
return notFoundResponse('Deployment version')
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
const blocks = deployedState.blocks
if (!blocks || typeof blocks !== 'object') {
return internalErrorResponse('Invalid deployed state structure')
}
const workflowData = workflowRecord as Record<string, unknown>
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId,
workflow: workflowData,
userId: workflowRecord.userId,
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})
if (!triggerSaveResult.success) {
logger.error(
`[${requestId}] Admin API: Failed to sync triggers for workflow ${workflowId}`,
triggerSaveResult.error
)
return internalErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
)
}
const scheduleResult = await createSchedulesForDeploy(workflowId, blocks, db, versionRow.id)
if (!scheduleResult.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
return internalErrorResponse(scheduleResult.error || 'Failed to sync schedules')
}
const result = await activateWorkflowVersion({ workflowId, version: versionNum })
if (!result.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
if (result.error === 'Deployment version not found') {
return notFoundResponse('Deployment version')
}
return internalErrorResponse(result.error || 'Failed to activate version')
}
logger.info(`Admin API: Activated version ${versionNum} for workflow ${workflowId}`)
if (previousVersionId && previousVersionId !== versionRow.id) {
try {
logger.info(
`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId} webhooks/schedules`
)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Admin API: Previous version cleanup completed`)
} catch (cleanupError) {
logger.error(
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
await syncMcpToolsForWorkflow({
workflowId,
requestId,
state: versionRow.state,
context: 'activate',
})
logger.info(
`[${requestId}] Admin API: Activated version ${versionNum} for workflow ${workflowId}`
)
return singleResponse({
success: true,
version: versionNum,
deployedAt: result.deployedAt!.toISOString(),
warnings: triggerSaveResult.warnings,
})
} catch (error) {
logger.error(`Admin API: Failed to activate version for workflow ${workflowId}`, { error })
logger.error(
`[${requestId}] Admin API: Failed to activate version for workflow ${workflowId}`,
{
error,
}
)
return internalErrorResponse('Failed to activate deployment version')
}
})

View File

@@ -7,12 +7,7 @@ import { getSession } from '@/lib/auth'
import { validateInteger } from '@/lib/core/security/input-validation'
import { PlatformEvents } from '@/lib/core/telemetry'
import { generateRequestId } from '@/lib/core/utils/request'
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
import {
cleanupExternalWebhook,
createExternalWebhookSubscription,
shouldRecreateExternalWebhookSubscription,
} from '@/lib/webhooks/provider-subscriptions'
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('WebhookAPI')
@@ -88,7 +83,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
}
}
// Update a webhook
export async function PATCH(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
@@ -103,7 +97,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
}
const body = await request.json()
const { path, provider, providerConfig, isActive, failedCount } = body
const { isActive, failedCount } = body
if (failedCount !== undefined) {
const validation = validateInteger(failedCount, 'failedCount', { min: 0 })
@@ -113,28 +107,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
}
}
const originalProviderConfig = providerConfig
let resolvedProviderConfig = providerConfig
if (providerConfig) {
const webhookDataForResolve = await db
.select({
workspaceId: workflow.workspaceId,
})
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.where(eq(webhook.id, id))
.limit(1)
if (webhookDataForResolve.length > 0) {
resolvedProviderConfig = await resolveEnvVarsInObject(
providerConfig,
session.user.id,
webhookDataForResolve[0].workspaceId || undefined
)
}
}
// Find the webhook and check permissions
const webhooks = await db
.select({
webhook: webhook,
@@ -155,16 +127,12 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
}
const webhookData = webhooks[0]
// Check if user has permission to modify this webhook
let canModify = false
// Case 1: User owns the workflow
if (webhookData.workflow.userId === session.user.id) {
canModify = true
}
// Case 2: Workflow belongs to a workspace and user has write or admin permission
if (!canModify && webhookData.workflow.workspaceId) {
const userPermission = await getUserEntityPermissions(
session.user.id,
@@ -183,80 +151,14 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
const existingProviderConfig =
(webhookData.webhook.providerConfig as Record<string, unknown>) || {}
let nextProviderConfig =
providerConfig !== undefined &&
resolvedProviderConfig &&
typeof resolvedProviderConfig === 'object'
? (resolvedProviderConfig as Record<string, unknown>)
: existingProviderConfig
const nextProvider = (provider ?? webhookData.webhook.provider) as string
if (
providerConfig !== undefined &&
shouldRecreateExternalWebhookSubscription({
previousProvider: webhookData.webhook.provider as string,
nextProvider,
previousConfig: existingProviderConfig,
nextConfig: nextProviderConfig,
})
) {
await cleanupExternalWebhook(
{ ...webhookData.webhook, providerConfig: existingProviderConfig },
webhookData.workflow,
requestId
)
const result = await createExternalWebhookSubscription(
request,
{
...webhookData.webhook,
provider: nextProvider,
providerConfig: nextProviderConfig,
},
webhookData.workflow,
session.user.id,
requestId
)
nextProviderConfig = result.updatedProviderConfig as Record<string, unknown>
}
logger.debug(`[${requestId}] Updating webhook properties`, {
hasPathUpdate: path !== undefined,
hasProviderUpdate: provider !== undefined,
hasConfigUpdate: providerConfig !== undefined,
hasActiveUpdate: isActive !== undefined,
hasFailedCountUpdate: failedCount !== undefined,
})
let finalProviderConfig = webhooks[0].webhook.providerConfig
if (providerConfig !== undefined && originalProviderConfig) {
const existingConfig = existingProviderConfig
finalProviderConfig = {
...originalProviderConfig,
credentialId: existingConfig.credentialId,
credentialSetId: existingConfig.credentialSetId,
userId: existingConfig.userId,
historyId: existingConfig.historyId,
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
setupCompleted: existingConfig.setupCompleted,
externalId: existingConfig.externalId,
}
for (const [key, value] of Object.entries(nextProviderConfig)) {
if (!(key in originalProviderConfig)) {
;(finalProviderConfig as Record<string, unknown>)[key] = value
}
}
}
const updatedWebhook = await db
.update(webhook)
.set({
path: path !== undefined ? path : webhooks[0].webhook.path,
provider: provider !== undefined ? provider : webhooks[0].webhook.provider,
providerConfig: finalProviderConfig,
isActive: isActive !== undefined ? isActive : webhooks[0].webhook.isActive,
failedCount: failedCount !== undefined ? failedCount : webhooks[0].webhook.failedCount,
updatedAt: new Date(),
@@ -339,11 +241,8 @@ export async function DELETE(
}
const foundWebhook = webhookData.webhook
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
const providerConfig = foundWebhook.providerConfig as Record<string, unknown> | null
const credentialSetId = providerConfig?.credentialSetId as string | undefined
const blockId = providerConfig?.blockId as string | undefined
const credentialSetId = foundWebhook.credentialSetId as string | undefined
const blockId = foundWebhook.blockId as string | undefined
if (credentialSetId && blockId) {
const allCredentialSetWebhooks = await db
@@ -351,10 +250,9 @@ export async function DELETE(
.from(webhook)
.where(and(eq(webhook.workflowId, webhookData.workflow.id), eq(webhook.blockId, blockId)))
const webhooksToDelete = allCredentialSetWebhooks.filter((w) => {
const config = w.providerConfig as Record<string, unknown> | null
return config?.credentialSetId === credentialSetId
})
const webhooksToDelete = allCredentialSetWebhooks.filter(
(w) => w.credentialSetId === credentialSetId
)
for (const w of webhooksToDelete) {
await cleanupExternalWebhook(w, webhookData.workflow, requestId)

View File

@@ -7,9 +7,21 @@ import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { PlatformEvents } from '@/lib/core/telemetry'
import { generateRequestId } from '@/lib/core/utils/request'
import { getProviderIdFromServiceId } from '@/lib/oauth'
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
import { createExternalWebhookSubscription } from '@/lib/webhooks/provider-subscriptions'
import {
cleanupExternalWebhook,
createExternalWebhookSubscription,
} from '@/lib/webhooks/provider-subscriptions'
import { mergeNonUserFields } from '@/lib/webhooks/utils'
import {
configureGmailPolling,
configureOutlookPolling,
configureRssPolling,
syncWebhooksForCredentialSet,
} from '@/lib/webhooks/utils.server'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { extractCredentialSetId, isCredentialSetValue } from '@/executor/constants'
const logger = createLogger('WebhooksAPI')
@@ -316,8 +328,6 @@ export async function POST(request: NextRequest) {
const directCredentialSetId = resolvedProviderConfig?.credentialSetId as string | undefined
if (directCredentialSetId || rawCredentialId) {
const { isCredentialSetValue, extractCredentialSetId } = await import('@/executor/constants')
const credentialSetId =
directCredentialSetId ||
(rawCredentialId && isCredentialSetValue(rawCredentialId)
@@ -329,11 +339,6 @@ export async function POST(request: NextRequest) {
`[${requestId}] Credential set detected for ${provider} trigger. Syncing webhooks for set ${credentialSetId}`
)
const { getProviderIdFromServiceId } = await import('@/lib/oauth')
const { syncWebhooksForCredentialSet, configureGmailPolling, configureOutlookPolling } =
await import('@/lib/webhooks/utils.server')
// Map provider to OAuth provider ID
const oauthProviderId = getProviderIdFromServiceId(provider)
const {
@@ -466,7 +471,8 @@ export async function POST(request: NextRequest) {
providerConfig: providerConfigOverride,
})
const configToSave = { ...originalProviderConfig }
const userProvided = originalProviderConfig as Record<string, unknown>
const configToSave: Record<string, unknown> = { ...userProvided }
try {
const result = await createExternalWebhookSubscription(
@@ -477,11 +483,7 @@ export async function POST(request: NextRequest) {
requestId
)
const updatedConfig = result.updatedProviderConfig as Record<string, unknown>
for (const [key, value] of Object.entries(updatedConfig)) {
if (!(key in originalProviderConfig)) {
configToSave[key] = value
}
}
mergeNonUserFields(configToSave, updatedConfig, userProvided)
resolvedProviderConfig = updatedConfig
externalSubscriptionCreated = result.externalSubscriptionCreated
} catch (err) {
@@ -547,7 +549,6 @@ export async function POST(request: NextRequest) {
if (externalSubscriptionCreated) {
logger.error(`[${requestId}] DB save failed, cleaning up external subscription`, dbError)
try {
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
await cleanupExternalWebhook(
createTempWebhookData(configToSave),
workflowRecord,
@@ -567,7 +568,6 @@ export async function POST(request: NextRequest) {
if (savedWebhook && provider === 'gmail') {
logger.info(`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`)
try {
const { configureGmailPolling } = await import('@/lib/webhooks/utils.server')
const success = await configureGmailPolling(savedWebhook, requestId)
if (!success) {
@@ -606,7 +606,6 @@ export async function POST(request: NextRequest) {
`[${requestId}] Outlook provider detected. Setting up Outlook webhook configuration.`
)
try {
const { configureOutlookPolling } = await import('@/lib/webhooks/utils.server')
const success = await configureOutlookPolling(savedWebhook, requestId)
if (!success) {
@@ -643,7 +642,6 @@ export async function POST(request: NextRequest) {
if (savedWebhook && provider === 'rss') {
logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`)
try {
const { configureRssPolling } = await import('@/lib/webhooks/utils.server')
const success = await configureRssPolling(savedWebhook, requestId)
if (!success) {

View File

@@ -4,7 +4,11 @@ import { and, desc, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { cleanupWebhooksForWorkflow, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import {
cleanupWebhooksForWorkflow,
restorePreviousVersionWebhooks,
saveTriggerWebhooksForDeploy,
} from '@/lib/webhooks/deploy'
import {
deployWorkflow,
loadWorkflowFromNormalizedTables,
@@ -135,6 +139,18 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployResult = await deployWorkflow({
workflowId: id,
deployedBy: actorUserId,
@@ -161,6 +177,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
blocks: normalizedData.blocks,
requestId,
deploymentVersionId,
previousVersionId,
})
if (!triggerSaveResult.success) {
@@ -194,6 +211,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
requestId,
deploymentVersionId,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
await undeployWorkflow({ workflowId: id })
return createErrorResponse(scheduleResult.error || 'Failed to create schedule', 500)
}
@@ -208,6 +234,25 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
)
}
if (previousVersionId && previousVersionId !== deploymentVersionId) {
try {
logger.info(`[${requestId}] Cleaning up previous version ${previousVersionId} DB records`)
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
} catch (cleanupError) {
logger.error(
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
// Non-fatal - continue with success response
}
}
logger.info(`[${requestId}] Workflow deployed successfully: ${id}`)
// Sync MCP tools with the latest parameter schema
@@ -228,6 +273,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
nextRunAt: scheduleInfo.nextRunAt,
}
: undefined,
warnings: triggerSaveResult.warnings,
})
} catch (error: any) {
logger.error(`[${requestId}] Error deploying workflow: ${id}`, {

View File

@@ -4,7 +4,7 @@ import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
@@ -85,6 +85,11 @@ export async function POST(
return createErrorResponse('Invalid deployed state structure', 500)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
@@ -93,6 +98,8 @@ export async function POST(
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})
if (!triggerSaveResult.success) {
@@ -102,11 +109,6 @@ export async function POST(
)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
if (!scheduleResult.success) {
@@ -116,6 +118,15 @@ export async function POST(
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
}
@@ -127,6 +138,15 @@ export async function POST(
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
}
@@ -140,6 +160,7 @@ export async function POST(
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Previous version cleanup completed`)
} catch (cleanupError) {
@@ -157,7 +178,11 @@ export async function POST(
context: 'activate',
})
return createSuccessResponse({ success: true, deployedAt: result.deployedAt })
return createSuccessResponse({
success: true,
deployedAt: result.deployedAt,
warnings: triggerSaveResult.warnings,
})
} catch (error: any) {
logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error)
return createErrorResponse(error.message || 'Failed to activate deployment', 500)

View File

@@ -95,6 +95,7 @@ export function DeployModal({
const [activeTab, setActiveTab] = useState<TabView>('general')
const [chatSubmitting, setChatSubmitting] = useState(false)
const [apiDeployError, setApiDeployError] = useState<string | null>(null)
const [apiDeployWarnings, setApiDeployWarnings] = useState<string[]>([])
const [isChatFormValid, setIsChatFormValid] = useState(false)
const [selectedStreamingOutputs, setSelectedStreamingOutputs] = useState<string[]>([])
@@ -227,6 +228,7 @@ export function DeployModal({
if (open && workflowId) {
setActiveTab('general')
setApiDeployError(null)
setApiDeployWarnings([])
}
}, [open, workflowId])
@@ -282,9 +284,13 @@ export function DeployModal({
if (!workflowId) return
setApiDeployError(null)
setApiDeployWarnings([])
try {
await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
if (result.warnings && result.warnings.length > 0) {
setApiDeployWarnings(result.warnings)
}
await refetchDeployedState()
} catch (error: unknown) {
logger.error('Error deploying workflow:', { error })
@@ -297,8 +303,13 @@ export function DeployModal({
async (version: number) => {
if (!workflowId) return
setApiDeployWarnings([])
try {
await activateVersionMutation.mutateAsync({ workflowId, version })
const result = await activateVersionMutation.mutateAsync({ workflowId, version })
if (result.warnings && result.warnings.length > 0) {
setApiDeployWarnings(result.warnings)
}
await refetchDeployedState()
} catch (error) {
logger.error('Error promoting version:', { error })
@@ -324,9 +335,13 @@ export function DeployModal({
if (!workflowId) return
setApiDeployError(null)
setApiDeployWarnings([])
try {
await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
if (result.warnings && result.warnings.length > 0) {
setApiDeployWarnings(result.warnings)
}
await refetchDeployedState()
} catch (error: unknown) {
logger.error('Error redeploying workflow:', { error })
@@ -338,6 +353,7 @@ export function DeployModal({
const handleCloseModal = useCallback(() => {
setChatSubmitting(false)
setApiDeployError(null)
setApiDeployWarnings([])
onOpenChange(false)
}, [onOpenChange])
@@ -479,6 +495,14 @@ export function DeployModal({
<div>{apiDeployError}</div>
</div>
)}
{apiDeployWarnings.length > 0 && (
<div className='mb-3 rounded-[4px] border border-amber-500/30 bg-amber-500/10 p-3 text-amber-700 dark:text-amber-400 text-sm'>
<div className='font-semibold'>Deployment Warning</div>
{apiDeployWarnings.map((warning, index) => (
<div key={index}>{warning}</div>
))}
</div>
)}
<ModalTabsContent value='general'>
<GeneralDeploy
workflowId={workflowId}

View File

@@ -85,7 +85,11 @@ export class ConditionBlockHandler implements BlockHandler {
const sourceBlockId = ctx.workflow?.connections.find((conn) => conn.target === block.id)?.source
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
const sourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
// Filter out _pauseMetadata from source output to prevent the engine from
// thinking this block is pausing (it was already resumed by the HITL block)
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
const outgoingConnections = ctx.workflow?.connections.filter((conn) => conn.source === block.id)
@@ -125,6 +129,14 @@ export class ConditionBlockHandler implements BlockHandler {
}
}
private filterPauseMetadata(output: any): any {
if (!output || typeof output !== 'object') {
return output
}
const { _pauseMetadata, ...rest } = output
return rest
}
private parseConditions(input: any): Array<{ id: string; title: string; value: string }> {
try {
const conditions = Array.isArray(input) ? input : JSON.parse(input || '[]')

View File

@@ -237,6 +237,7 @@ interface DeployWorkflowResult {
isDeployed: boolean
deployedAt?: string
apiKey?: string
warnings?: string[]
}
/**
@@ -272,6 +273,7 @@ export function useDeployWorkflow() {
isDeployed: data.isDeployed ?? false,
deployedAt: data.deployedAt,
apiKey: data.apiKey,
warnings: data.warnings,
}
},
onSuccess: (data, variables) => {
@@ -360,6 +362,7 @@ interface ActivateVersionVariables {
interface ActivateVersionResult {
deployedAt?: string
apiKey?: string
warnings?: string[]
}
/**

View File

@@ -1,4 +1,4 @@
import { useCallback, useEffect, useMemo, useState } from 'react'
import { useCallback, useEffect, useMemo } from 'react'
import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation'
import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -6,12 +6,10 @@ import { getBlock } from '@/blocks'
import { populateTriggerFieldsFromConfig } from '@/hooks/use-trigger-config-aggregation'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import { getTrigger, isTriggerValid } from '@/triggers'
import { isTriggerValid } from '@/triggers'
const logger = createLogger('useWebhookManagement')
const CREDENTIAL_SET_PREFIX = 'credentialSet:'
interface UseWebhookManagementProps {
blockId: string
triggerId?: string
@@ -24,9 +22,6 @@ interface WebhookManagementState {
webhookPath: string
webhookId: string | null
isLoading: boolean
isSaving: boolean
saveConfig: () => Promise<boolean>
deleteConfig: () => Promise<boolean>
}
/**
@@ -83,11 +78,9 @@ function resolveEffectiveTriggerId(
}
/**
* Hook to manage webhook lifecycle for trigger blocks
* Handles:
* - Pre-generating webhook URLs based on blockId (without creating webhook)
* - Loading existing webhooks from the API
* - Saving and deleting webhook configurations
* Hook to load webhook info for trigger blocks.
* Used for displaying webhook URLs in the UI.
* Webhook creation/updates are handled by the deploy flow.
*/
export function useWebhookManagement({
blockId,
@@ -98,8 +91,6 @@ export function useWebhookManagement({
const params = useParams()
const workflowId = params.workflowId as string
const triggerDef = triggerId && isTriggerValid(triggerId) ? getTrigger(triggerId) : null
const webhookId = useSubBlockStore(
useCallback((state) => state.getValue(blockId, 'webhookId') as string | null, [blockId])
)
@@ -107,7 +98,6 @@ export function useWebhookManagement({
useCallback((state) => state.getValue(blockId, 'triggerPath') as string | null, [blockId])
)
const isLoading = useSubBlockStore((state) => state.loadingWebhooks.has(blockId))
const isChecked = useSubBlockStore((state) => state.checkedWebhooks.has(blockId))
const webhookUrl = useMemo(() => {
if (!webhookPath) {
@@ -118,8 +108,6 @@ export function useWebhookManagement({
return `${baseUrl}/api/webhooks/trigger/${webhookPath}`
}, [webhookPath, blockId])
const [isSaving, setIsSaving] = useState(false)
useEffect(() => {
if (triggerId && !isPreview) {
const storedTriggerId = useSubBlockStore.getState().getValue(blockId, 'triggerId')
@@ -143,7 +131,7 @@ export function useWebhookManagement({
return
}
const loadWebhookOrGenerateUrl = async () => {
const loadWebhookInfo = async () => {
useSubBlockStore.setState((state) => ({
loadingWebhooks: new Set([...state.loadingWebhooks, blockId]),
}))
@@ -171,8 +159,6 @@ export function useWebhookManagement({
if (webhook.providerConfig) {
const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId, webhook)
// Filter out runtime/system fields from providerConfig before storing as triggerConfig
// These fields are managed by the system and should not be included in change detection
const {
credentialId: _credId,
credentialSetId: _credSetId,
@@ -224,202 +210,14 @@ export function useWebhookManagement({
}
}
if (useWebhookUrl) {
loadWebhookOrGenerateUrl()
loadWebhookInfo()
}
}, [isPreview, triggerId, workflowId, blockId, useWebhookUrl])
const createWebhook = async (
effectiveTriggerId: string | undefined,
selectedCredentialId: string | null
): Promise<boolean> => {
if (!triggerDef || !effectiveTriggerId) {
return false
}
const triggerConfig = useSubBlockStore.getState().getValue(blockId, 'triggerConfig')
const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX)
const credentialSetId = isCredentialSet
? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length)
: undefined
const credentialId = isCredentialSet ? undefined : selectedCredentialId
const webhookConfig = {
...(triggerConfig || {}),
...(credentialId ? { credentialId } : {}),
...(credentialSetId ? { credentialSetId } : {}),
triggerId: effectiveTriggerId,
}
const path = blockId
const response = await fetch('/api/webhooks', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
workflowId,
blockId,
path,
provider: triggerDef.provider,
providerConfig: webhookConfig,
}),
})
if (!response.ok) {
let errorMessage = 'Failed to create webhook'
try {
const errorData = await response.json()
errorMessage = errorData.details || errorData.error || errorMessage
} catch {
// If response is not JSON, use default message
}
logger.error('Failed to create webhook', { errorMessage })
throw new Error(errorMessage)
}
const data = await response.json()
const savedWebhookId = data.webhook.id
useSubBlockStore.getState().setValue(blockId, 'triggerPath', path)
useSubBlockStore.getState().setValue(blockId, 'triggerId', effectiveTriggerId)
useSubBlockStore.getState().setValue(blockId, 'webhookId', savedWebhookId)
useSubBlockStore.setState((state) => ({
checkedWebhooks: new Set([...state.checkedWebhooks, blockId]),
}))
logger.info('Trigger webhook created successfully', {
webhookId: savedWebhookId,
triggerId: effectiveTriggerId,
provider: triggerDef.provider,
blockId,
})
return true
}
const updateWebhook = async (
webhookIdToUpdate: string,
effectiveTriggerId: string | undefined,
selectedCredentialId: string | null
): Promise<boolean> => {
const triggerConfigRaw = useSubBlockStore.getState().getValue(blockId, 'triggerConfig')
const triggerConfig =
typeof triggerConfigRaw === 'object' && triggerConfigRaw !== null
? (triggerConfigRaw as Record<string, unknown>)
: {}
const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX)
const credentialSetId = isCredentialSet
? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length)
: undefined
const credentialId = isCredentialSet ? undefined : selectedCredentialId
const response = await fetch(`/api/webhooks/${webhookIdToUpdate}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
providerConfig: {
...triggerConfig,
...(credentialId ? { credentialId } : {}),
...(credentialSetId ? { credentialSetId } : {}),
triggerId: effectiveTriggerId,
},
}),
})
if (response.status === 404) {
logger.warn('Webhook not found while updating, recreating', {
blockId,
lostWebhookId: webhookIdToUpdate,
})
useSubBlockStore.getState().setValue(blockId, 'webhookId', null)
return createWebhook(effectiveTriggerId, selectedCredentialId)
}
if (!response.ok) {
let errorMessage = 'Failed to save trigger configuration'
try {
const errorData = await response.json()
errorMessage = errorData.details || errorData.error || errorMessage
} catch {
// If response is not JSON, use default message
}
logger.error('Failed to save trigger config', { errorMessage })
throw new Error(errorMessage)
}
logger.info('Trigger config saved successfully', { blockId, webhookId: webhookIdToUpdate })
return true
}
const saveConfig = async (): Promise<boolean> => {
if (isPreview || !triggerDef) {
return false
}
const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId)
try {
setIsSaving(true)
const triggerCredentials = useSubBlockStore.getState().getValue(blockId, 'triggerCredentials')
const selectedCredentialId = (triggerCredentials as string | null) || null
if (!webhookId) {
return createWebhook(effectiveTriggerId, selectedCredentialId)
}
return updateWebhook(webhookId, effectiveTriggerId, selectedCredentialId)
} catch (error) {
logger.error('Error saving trigger config:', error)
throw error
} finally {
setIsSaving(false)
}
}
const deleteConfig = async (): Promise<boolean> => {
if (isPreview || !webhookId) {
return false
}
try {
setIsSaving(true)
const response = await fetch(`/api/webhooks/${webhookId}`, {
method: 'DELETE',
})
if (!response.ok) {
logger.error('Failed to delete webhook')
return false
}
useSubBlockStore.getState().setValue(blockId, 'triggerPath', '')
useSubBlockStore.getState().setValue(blockId, 'webhookId', null)
useSubBlockStore.setState((state) => {
const newSet = new Set(state.checkedWebhooks)
newSet.delete(blockId)
return { checkedWebhooks: newSet }
})
logger.info('Webhook deleted successfully')
return true
} catch (error) {
logger.error('Error deleting webhook:', error)
return false
} finally {
setIsSaving(false)
}
}
return {
webhookUrl,
webhookPath: webhookPath || blockId,
webhookId,
isLoading,
isSaving,
saveConfig,
deleteConfig,
}
}

View File

@@ -32,6 +32,12 @@ interface TriggerSaveError {
interface TriggerSaveResult {
success: boolean
error?: TriggerSaveError
warnings?: string[]
}
interface CredentialSetSyncResult {
error: TriggerSaveError | null
warnings: string[]
}
interface SaveTriggerWebhooksInput {
@@ -42,6 +48,16 @@ interface SaveTriggerWebhooksInput {
blocks: Record<string, BlockState>
requestId: string
deploymentVersionId?: string
/**
* The previous active version's ID. Only this version's external subscriptions
* will be cleaned up (along with draft webhooks). If not provided, skips cleanup.
*/
previousVersionId?: string
/**
* When true, forces recreation of external subscriptions even if webhook config is unchanged.
* Used when activating a previous deployment version whose subscriptions were cleaned up.
*/
forceRecreateSubscriptions?: boolean
}
function getSubBlockValue(block: BlockState, subBlockId: string): unknown {
@@ -248,7 +264,7 @@ async function syncCredentialSetWebhooks(params: {
providerConfig: Record<string, unknown>
requestId: string
deploymentVersionId?: string
}): Promise<TriggerSaveError | null> {
}): Promise<CredentialSetSyncResult> {
const {
workflowId,
blockId,
@@ -261,7 +277,7 @@ async function syncCredentialSetWebhooks(params: {
const credentialSetId = providerConfig.credentialSetId as string | undefined
if (!credentialSetId) {
return null
return { error: null, warnings: [] }
}
const oauthProviderId = getProviderIdFromServiceId(provider)
@@ -280,10 +296,23 @@ async function syncCredentialSetWebhooks(params: {
deploymentVersionId,
})
const warnings: string[] = []
if (syncResult.failed.length > 0) {
const failedCount = syncResult.failed.length
const totalCount = syncResult.webhooks.length + failedCount
warnings.push(
`${failedCount} of ${totalCount} credentials in the set failed to sync for ${provider}. Some team members may not receive triggers.`
)
}
if (syncResult.webhooks.length === 0) {
return {
message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`,
status: 400,
error: {
message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`,
status: 400,
},
warnings,
}
}
@@ -297,8 +326,11 @@ async function syncCredentialSetWebhooks(params: {
if (!success) {
await db.delete(webhook).where(eq(webhook.id, wh.id))
return {
message: `Failed to configure ${provider} polling. Please check account permissions.`,
status: 500,
error: {
message: `Failed to configure ${provider} polling. Please check account permissions.`,
status: 500,
},
warnings,
}
}
}
@@ -306,84 +338,7 @@ async function syncCredentialSetWebhooks(params: {
}
}
return null
}
async function createWebhookForBlock(params: {
request: NextRequest
workflowId: string
workflow: Record<string, unknown>
userId: string
block: BlockState
provider: string
providerConfig: Record<string, unknown>
triggerPath: string
requestId: string
deploymentVersionId?: string
}): Promise<TriggerSaveError | null> {
const {
request,
workflowId,
workflow,
userId,
block,
provider,
providerConfig,
triggerPath,
requestId,
deploymentVersionId,
} = params
const webhookId = nanoid()
const createPayload = {
id: webhookId,
path: triggerPath,
provider,
providerConfig,
}
const result = await createExternalWebhookSubscription(
request,
createPayload,
workflow,
userId,
requestId
)
const updatedProviderConfig = result.updatedProviderConfig as Record<string, unknown>
let savedWebhook: any
try {
const createdRows = await db
.insert(webhook)
.values({
id: webhookId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId: block.id,
path: triggerPath,
provider,
providerConfig: updatedProviderConfig,
credentialSetId: (updatedProviderConfig.credentialSetId as string | undefined) || null,
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
})
.returning()
savedWebhook = createdRows[0]
} catch (error) {
if (result.externalSubscriptionCreated) {
await cleanupExternalWebhook(createPayload, workflow, requestId)
}
throw error
}
const pollingError = await configurePollingIfNeeded(provider, savedWebhook, requestId)
if (pollingError) {
return pollingError
}
return null
return { error: null, warnings }
}
/**
@@ -398,23 +353,57 @@ export async function saveTriggerWebhooksForDeploy({
blocks,
requestId,
deploymentVersionId,
previousVersionId,
forceRecreateSubscriptions = false,
}: SaveTriggerWebhooksInput): Promise<TriggerSaveResult> {
const triggerBlocks = Object.values(blocks || {}).filter(Boolean)
const triggerBlocks = Object.values(blocks || {}).filter((b) => b && b.enabled !== false)
const currentBlockIds = new Set(triggerBlocks.map((b) => b.id))
// 1. Get all existing webhooks for this workflow
const existingWebhooks = await db
// 1. Get ALL webhooks for this workflow (all versions including draft)
const allWorkflowWebhooks = await db
.select()
.from(webhook)
.where(
deploymentVersionId
? and(
eq(webhook.workflowId, workflowId),
eq(webhook.deploymentVersionId, deploymentVersionId)
)
: eq(webhook.workflowId, workflowId)
.where(eq(webhook.workflowId, workflowId))
// Separate webhooks by version: current deployment vs others
const existingWebhooks: typeof allWorkflowWebhooks = []
for (const wh of allWorkflowWebhooks) {
if (deploymentVersionId && wh.deploymentVersionId === deploymentVersionId) {
existingWebhooks.push(wh)
}
}
if (previousVersionId) {
const webhooksToCleanup = allWorkflowWebhooks.filter(
(wh) => wh.deploymentVersionId === previousVersionId
)
if (webhooksToCleanup.length > 0) {
logger.info(
`[${requestId}] Cleaning up ${webhooksToCleanup.length} external subscription(s) from previous version`
)
for (const wh of webhooksToCleanup) {
try {
await cleanupExternalWebhook(wh, workflow, requestId)
} catch (cleanupError) {
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
}
}
}
}
const restorePreviousSubscriptions = async () => {
if (!previousVersionId) return
await restorePreviousVersionWebhooks({
request,
workflow,
userId,
previousVersionId,
requestId,
})
}
const webhooksByBlockId = new Map<string, typeof existingWebhooks>()
for (const wh of existingWebhooks) {
if (!wh.blockId) continue
@@ -429,7 +418,14 @@ export async function saveTriggerWebhooksForDeploy({
existingWebhookBlockIds: Array.from(webhooksByBlockId.keys()),
})
// 2. Determine which webhooks to delete (orphaned or config changed)
type WebhookConfig = {
provider: string
providerConfig: Record<string, unknown>
triggerPath: string
triggerDef: ReturnType<typeof getTrigger>
}
const webhookConfigs = new Map<string, WebhookConfig>()
const webhooksToDelete: typeof existingWebhooks = []
const blocksNeedingWebhook: BlockState[] = []
const blocksNeedingCredentialSetSync: BlockState[] = []
@@ -447,6 +443,7 @@ export async function saveTriggerWebhooksForDeploy({
)
if (missingFields.length > 0) {
await restorePreviousSubscriptions()
return {
success: false,
error: {
@@ -455,9 +452,8 @@ export async function saveTriggerWebhooksForDeploy({
},
}
}
// Store config for later use
;(block as any)._webhookConfig = { provider, providerConfig, triggerPath, triggerDef }
webhookConfigs.set(block.id, { provider, providerConfig, triggerPath, triggerDef })
if (providerConfig.credentialSetId) {
blocksNeedingCredentialSetSync.push(block)
@@ -477,22 +473,29 @@ export async function saveTriggerWebhooksForDeploy({
)
}
// Check if config changed
// Check if config changed or if we're forcing recreation (e.g., activating old version)
const existingConfig = (existingWh.providerConfig as Record<string, unknown>) || {}
if (
const needsRecreation =
forceRecreateSubscriptions ||
shouldRecreateExternalWebhookSubscription({
previousProvider: existingWh.provider as string,
nextProvider: provider,
previousConfig: existingConfig,
nextConfig: providerConfig,
})
) {
// Config changed - delete and recreate
if (needsRecreation) {
webhooksToDelete.push(existingWh)
blocksNeedingWebhook.push(block)
logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`)
if (forceRecreateSubscriptions) {
logger.info(
`[${requestId}] Forcing webhook recreation for block ${block.id} (reactivating version)`
)
} else {
logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`)
}
}
// else: config unchanged, keep existing webhook
// else: config unchanged and not forcing recreation, keep existing webhook
}
}
@@ -522,15 +525,16 @@ export async function saveTriggerWebhooksForDeploy({
await db.delete(webhook).where(inArray(webhook.id, idsToDelete))
}
// 4. Sync credential set webhooks
const collectedWarnings: string[] = []
for (const block of blocksNeedingCredentialSetSync) {
const config = (block as any)._webhookConfig
const config = webhookConfigs.get(block.id)
if (!config) continue
const { provider, providerConfig, triggerPath } = config
try {
const credentialSetError = await syncCredentialSetWebhooks({
const syncResult = await syncCredentialSetWebhooks({
workflowId,
blockId: block.id,
provider,
@@ -540,74 +544,214 @@ export async function saveTriggerWebhooksForDeploy({
deploymentVersionId,
})
if (credentialSetError) {
return { success: false, error: credentialSetError }
if (syncResult.warnings.length > 0) {
collectedWarnings.push(...syncResult.warnings)
}
if (syncResult.error) {
await restorePreviousSubscriptions()
return { success: false, error: syncResult.error, warnings: collectedWarnings }
}
} catch (error: any) {
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: error?.message || 'Failed to save trigger configuration',
status: 500,
},
warnings: collectedWarnings,
}
}
}
// 5. Create webhooks for blocks that need them
// 5. Create webhooks for blocks that need them (two-phase approach for atomicity)
const createdSubscriptions: Array<{
webhookId: string
block: BlockState
provider: string
triggerPath: string
updatedProviderConfig: Record<string, unknown>
externalSubscriptionCreated: boolean
}> = []
for (const block of blocksNeedingWebhook) {
const config = (block as any)._webhookConfig
const config = webhookConfigs.get(block.id)
if (!config) continue
const { provider, providerConfig, triggerPath } = config
const webhookId = nanoid()
const createPayload = {
id: webhookId,
path: triggerPath,
provider,
providerConfig,
}
try {
const createError = await createWebhookForBlock({
const result = await createExternalWebhookSubscription(
request,
workflowId,
createPayload,
workflow,
userId,
requestId
)
createdSubscriptions.push({
webhookId,
block,
provider,
providerConfig,
triggerPath,
requestId,
deploymentVersionId,
updatedProviderConfig: result.updatedProviderConfig as Record<string, unknown>,
externalSubscriptionCreated: result.externalSubscriptionCreated,
})
if (createError) {
return { success: false, error: createError }
}
} catch (error: any) {
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
logger.error(`[${requestId}] Failed to create external subscription for ${block.id}`, error)
for (const sub of createdSubscriptions) {
if (sub.externalSubscriptionCreated) {
try {
await cleanupExternalWebhook(
{
id: sub.webhookId,
path: sub.triggerPath,
provider: sub.provider,
providerConfig: sub.updatedProviderConfig,
},
workflow,
requestId
)
} catch (cleanupError) {
logger.warn(
`[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`,
cleanupError
)
}
}
}
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: error?.message || 'Failed to save trigger configuration',
message: error?.message || 'Failed to create external subscription',
status: 500,
},
}
}
}
// Clean up temp config
for (const block of triggerBlocks) {
;(block as any)._webhookConfig = undefined
// Phase 2: Insert all DB records in a transaction
try {
await db.transaction(async (tx) => {
for (const sub of createdSubscriptions) {
await tx.insert(webhook).values({
id: sub.webhookId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId: sub.block.id,
path: sub.triggerPath,
provider: sub.provider,
providerConfig: sub.updatedProviderConfig,
credentialSetId:
(sub.updatedProviderConfig.credentialSetId as string | undefined) || null,
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
})
}
})
for (const sub of createdSubscriptions) {
const pollingError = await configurePollingIfNeeded(
sub.provider,
{ id: sub.webhookId, path: sub.triggerPath, providerConfig: sub.updatedProviderConfig },
requestId
)
if (pollingError) {
logger.error(
`[${requestId}] Polling configuration failed for ${sub.block.id}`,
pollingError
)
for (const otherSub of createdSubscriptions) {
if (otherSub.webhookId === sub.webhookId) continue
if (otherSub.externalSubscriptionCreated) {
try {
await cleanupExternalWebhook(
{
id: otherSub.webhookId,
path: otherSub.triggerPath,
provider: otherSub.provider,
providerConfig: otherSub.updatedProviderConfig,
},
workflow,
requestId
)
} catch (cleanupError) {
logger.warn(
`[${requestId}] Failed to cleanup external subscription for ${otherSub.block.id}`,
cleanupError
)
}
}
}
const otherWebhookIds = createdSubscriptions
.filter((s) => s.webhookId !== sub.webhookId)
.map((s) => s.webhookId)
if (otherWebhookIds.length > 0) {
await db.delete(webhook).where(inArray(webhook.id, otherWebhookIds))
}
await restorePreviousSubscriptions()
return { success: false, error: pollingError }
}
}
} catch (error: any) {
logger.error(`[${requestId}] Failed to insert webhook records`, error)
for (const sub of createdSubscriptions) {
if (sub.externalSubscriptionCreated) {
try {
await cleanupExternalWebhook(
{
id: sub.webhookId,
path: sub.triggerPath,
provider: sub.provider,
providerConfig: sub.updatedProviderConfig,
},
workflow,
requestId
)
} catch (cleanupError) {
logger.warn(
`[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`,
cleanupError
)
}
}
}
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: error?.message || 'Failed to save webhook records',
status: 500,
},
}
}
return { success: true }
return { success: true, warnings: collectedWarnings.length > 0 ? collectedWarnings : undefined }
}
/**
* Clean up all webhooks for a workflow during undeploy.
* Removes external subscriptions and deletes webhook records from the database.
*
* @param skipExternalCleanup - If true, skip external subscription cleanup (already done elsewhere)
*/
export async function cleanupWebhooksForWorkflow(
workflowId: string,
workflow: Record<string, unknown>,
requestId: string,
deploymentVersionId?: string
deploymentVersionId?: string,
skipExternalCleanup = false
): Promise<void> {
const existingWebhooks = await db
.select()
@@ -626,23 +770,26 @@ export async function cleanupWebhooksForWorkflow(
return
}
logger.info(`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for undeploy`, {
workflowId,
deploymentVersionId,
webhookIds: existingWebhooks.map((wh) => wh.id),
})
logger.info(
`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for ${skipExternalCleanup ? 'DB records only' : 'undeploy'}`,
{
workflowId,
deploymentVersionId,
webhookIds: existingWebhooks.map((wh) => wh.id),
}
)
// Clean up external subscriptions
for (const wh of existingWebhooks) {
try {
await cleanupExternalWebhook(wh, workflow, requestId)
} catch (cleanupError) {
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
// Continue with other webhooks even if one fails
if (!skipExternalCleanup) {
for (const wh of existingWebhooks) {
try {
await cleanupExternalWebhook(wh, workflow, requestId)
} catch (cleanupError) {
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
// Continue with other webhooks even if one fails
}
}
}
// Delete all webhook records
await db
.delete(webhook)
.where(
@@ -660,3 +807,55 @@ export async function cleanupWebhooksForWorkflow(
: `[${requestId}] Cleaned up all webhooks for workflow ${workflowId}`
)
}
/**
* Restore external subscriptions for a previous deployment version.
* Used when activation/deployment fails after webhooks were created,
* to restore the previous version's external subscriptions.
*/
export async function restorePreviousVersionWebhooks(params: {
request: NextRequest
workflow: Record<string, unknown>
userId: string
previousVersionId: string
requestId: string
}): Promise<void> {
const { request, workflow, userId, previousVersionId, requestId } = params
const previousWebhooks = await db
.select()
.from(webhook)
.where(eq(webhook.deploymentVersionId, previousVersionId))
if (previousWebhooks.length === 0) {
logger.debug(`[${requestId}] No previous webhooks to restore for version ${previousVersionId}`)
return
}
logger.info(
`[${requestId}] Restoring ${previousWebhooks.length} external subscription(s) for previous version ${previousVersionId}`
)
for (const wh of previousWebhooks) {
try {
await createExternalWebhookSubscription(
request,
{
id: wh.id,
path: wh.path,
provider: wh.provider,
providerConfig: (wh.providerConfig as Record<string, unknown>) || {},
},
workflow,
userId,
requestId
)
logger.info(`[${requestId}] Restored external subscription for webhook ${wh.id}`)
} catch (restoreError) {
logger.error(
`[${requestId}] Failed to restore external subscription for webhook ${wh.id}`,
restoreError
)
}
}
}

View File

@@ -161,7 +161,7 @@ export async function pollGmailWebhooks() {
const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId
const credentialSetId: string | undefined = metadata?.credentialSetId
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credential info for webhook ${webhookId}`)
@@ -697,7 +697,6 @@ async function processEmails(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),
@@ -766,17 +765,21 @@ async function markEmailAsRead(accessToken: string, messageId: string) {
}
async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
try {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (error) {
logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error)
}
}

View File

@@ -645,7 +645,6 @@ async function processEmails(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),

View File

@@ -210,7 +210,7 @@ export async function pollOutlookWebhooks() {
const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId
const credentialSetId: string | undefined = metadata?.credentialSetId
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`)
@@ -607,7 +607,6 @@ async function processOutlookEmails(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),

View File

@@ -7,16 +7,27 @@ import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
import {
handleSlackChallenge,
handleWhatsAppVerification,
validateCirclebackSignature,
validateFirefliesSignature,
validateGitHubSignature,
validateJiraSignature,
validateLinearSignature,
validateMicrosoftTeamsSignature,
validateTwilioSignature,
validateTypeformSignature,
verifyProviderWebhook,
} from '@/lib/webhooks/utils.server'
import { executeWebhookJob } from '@/background/webhook-execution'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { isGitHubEventMatch } from '@/triggers/github/utils'
import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
import { isJiraEventMatch } from '@/triggers/jira/utils'
const logger = createLogger('WebhookProcessor')
@@ -451,7 +462,6 @@ export async function verifyProviderAuth(
// Step 1: Fetch and decrypt environment variables for signature verification
let decryptedEnvVars: Record<string, string> = {}
try {
const { getEffectiveDecryptedEnv } = await import('@/lib/environment/utils')
decryptedEnvVars = await getEffectiveDecryptedEnv(
foundWorkflow.userId,
foundWorkflow.workspaceId
@@ -553,9 +563,6 @@ export async function verifyProviderAuth(
}
const fullUrl = getExternalUrl(request)
const { validateTwilioSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = await validateTwilioSignature(authToken, signature, fullUrl, params)
if (!isValidSignature) {
@@ -583,8 +590,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Typeform signature', { status: 401 })
}
const { validateTypeformSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateTypeformSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -610,8 +615,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Linear signature', { status: 401 })
}
const { validateLinearSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateLinearSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -637,8 +640,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Circleback signature', { status: 401 })
}
const { validateCirclebackSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateCirclebackSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -664,8 +665,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Jira signature', { status: 401 })
}
const { validateJiraSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateJiraSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -694,8 +693,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing GitHub signature', { status: 401 })
}
const { validateGitHubSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateGitHubSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -724,8 +721,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Fireflies signature', { status: 401 })
}
const { validateFirefliesSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateFirefliesSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -860,8 +855,6 @@ export async function queueWebhookExecution(
const eventType = request.headers.get('x-github-event')
const action = body.action
const { isGitHubEventMatch } = await import('@/triggers/github/utils')
if (!isGitHubEventMatch(triggerId, eventType || '', action, body)) {
logger.debug(
`[${options.requestId}] GitHub event mismatch for trigger ${triggerId}. Event: ${eventType}, Action: ${action}. Skipping execution.`,
@@ -890,8 +883,6 @@ export async function queueWebhookExecution(
if (triggerId && triggerId !== 'jira_webhook') {
const webhookEvent = body.webhookEvent as string | undefined
const { isJiraEventMatch } = await import('@/triggers/jira/utils')
if (!isJiraEventMatch(triggerId, webhookEvent || '', body)) {
logger.debug(
`[${options.requestId}] Jira event mismatch for trigger ${triggerId}. Event: ${webhookEvent}. Skipping execution.`,
@@ -921,8 +912,6 @@ export async function queueWebhookExecution(
const subscriptionType = firstEvent?.subscriptionType as string | undefined
const { isHubSpotContactEventMatch } = await import('@/triggers/hubspot/utils')
if (!isHubSpotContactEventMatch(triggerId, subscriptionType || '')) {
logger.debug(
`[${options.requestId}] HubSpot event mismatch for trigger ${triggerId}. Event: ${subscriptionType}. Skipping execution.`,
@@ -974,7 +963,8 @@ export async function queueWebhookExecution(
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
const credentialId = providerConfig.credentialId as string | undefined
const credentialSetId = providerConfig.credentialSetId as string | undefined
// credentialSetId is a direct field on webhook table, not in providerConfig
const credentialSetId = foundWebhook.credentialSetId as string | undefined
// Verify billing for credential sets
if (credentialSetId) {

View File

@@ -30,11 +30,11 @@ export async function createTeamsSubscription(
webhook: any,
workflow: any,
requestId: string
): Promise<void> {
): Promise<string | undefined> {
const config = getProviderConfig(webhook)
if (config.triggerId !== 'microsoftteams_chat_subscription') {
return
return undefined
}
const credentialId = config.credentialId as string | undefined
@@ -77,7 +77,7 @@ export async function createTeamsSubscription(
teamsLogger.info(
`[${requestId}] Teams subscription ${existingSubscriptionId} already exists for webhook ${webhook.id}`
)
return
return existingSubscriptionId
}
} catch {
teamsLogger.debug(`[${requestId}] Existing subscription check failed, will create new one`)
@@ -140,6 +140,7 @@ export async function createTeamsSubscription(
teamsLogger.info(
`[${requestId}] Successfully created Teams subscription ${payload.id} for webhook ${webhook.id}`
)
return payload.id as string
} catch (error: any) {
if (
error instanceof Error &&
@@ -1600,9 +1601,11 @@ export async function createExternalWebhookSubscription(
externalSubscriptionCreated = true
}
} else if (provider === 'microsoft-teams') {
await createTeamsSubscription(request, webhookData, workflow, requestId)
externalSubscriptionCreated =
(providerConfig.triggerId as string | undefined) === 'microsoftteams_chat_subscription'
const subscriptionId = await createTeamsSubscription(request, webhookData, workflow, requestId)
if (subscriptionId) {
updatedProviderConfig = { ...updatedProviderConfig, externalSubscriptionId: subscriptionId }
externalSubscriptionCreated = true
}
} else if (provider === 'telegram') {
await createTelegramWebhook(request, webhookData, requestId)
externalSubscriptionCreated = true

View File

@@ -379,7 +379,6 @@ async function processRssItems(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),

View File

@@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { type NextRequest, NextResponse } from 'next/server'
import {
type SecureFetchResponse,
@@ -9,7 +10,11 @@ import {
validateUrlWithDNS,
} from '@/lib/core/security/input-validation'
import type { DbOrTx } from '@/lib/db/types'
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { getProviderIdFromServiceId } from '@/lib/oauth'
import {
getCredentialsForCredentialSet,
refreshAccessTokenIfNeeded,
} from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookUtils')
@@ -1388,21 +1393,6 @@ export function verifyProviderWebhook(
case 'stripe':
break
case 'gmail':
if (providerConfig.secret) {
const secretHeader = request.headers.get('X-Webhook-Secret')
if (!secretHeader || secretHeader.length !== providerConfig.secret.length) {
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
let result = 0
for (let i = 0; i < secretHeader.length; i++) {
result |= secretHeader.charCodeAt(i) ^ providerConfig.secret.charCodeAt(i)
}
if (result !== 0) {
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
}
break
case 'telegram': {
// Check User-Agent to ensure it's not blocked by middleware
@@ -1946,6 +1936,10 @@ export interface CredentialSetWebhookSyncResult {
created: number
updated: number
deleted: number
failed: Array<{
credentialId: string
error: string
}>
}
/**
@@ -1997,9 +1991,6 @@ export async function syncWebhooksForCredentialSet(params: {
`[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}`
)
const { getCredentialsForCredentialSet } = await import('@/app/api/auth/oauth/utils')
const { nanoid } = await import('nanoid')
// Polling providers get unique paths per credential (for independent state)
// External webhook providers share the same path (external service sends to one URL)
const pollingProviders = ['gmail', 'outlook', 'rss', 'imap']
@@ -2011,7 +2002,7 @@ export async function syncWebhooksForCredentialSet(params: {
syncLogger.warn(
`[${requestId}] No credentials found in credential set ${credentialSetId} for provider ${oauthProviderId}`
)
return { webhooks: [], created: 0, updated: 0, deleted: 0 }
return { webhooks: [], created: 0, updated: 0, deleted: 0, failed: [] }
}
syncLogger.info(
@@ -2033,10 +2024,9 @@ export async function syncWebhooksForCredentialSet(params: {
)
// Filter to only webhooks belonging to this credential set
const credentialSetWebhooks = existingWebhooks.filter((wh) => {
const config = wh.providerConfig as Record<string, any>
return config?.credentialSetId === credentialSetId
})
const credentialSetWebhooks = existingWebhooks.filter(
(wh) => wh.credentialSetId === credentialSetId
)
syncLogger.info(
`[${requestId}] Found ${credentialSetWebhooks.length} existing webhooks for credential set`
@@ -2058,103 +2048,128 @@ export async function syncWebhooksForCredentialSet(params: {
created: 0,
updated: 0,
deleted: 0,
failed: [],
}
// Process each credential in the set
for (const cred of credentials) {
const existingWebhook = existingByCredentialId.get(cred.credentialId)
try {
const existingWebhook = existingByCredentialId.get(cred.credentialId)
if (existingWebhook) {
// Update existing webhook - preserve state fields
const existingConfig = existingWebhook.providerConfig as Record<string, any>
if (existingWebhook) {
// Update existing webhook - preserve state fields
const existingConfig = existingWebhook.providerConfig as Record<string, any>
const updatedConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
// Preserve state fields from existing config
historyId: existingConfig.historyId,
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
setupCompleted: existingConfig.setupCompleted,
externalId: existingConfig.externalId,
userId: cred.userId,
}
const updatedConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
// Preserve state fields from existing config
historyId: existingConfig?.historyId,
lastCheckedTimestamp: existingConfig?.lastCheckedTimestamp,
setupCompleted: existingConfig?.setupCompleted,
externalId: existingConfig?.externalId,
userId: cred.userId,
}
await dbCtx
.update(webhook)
.set({
...(deploymentVersionId ? { deploymentVersionId } : {}),
providerConfig: updatedConfig,
await dbCtx
.update(webhook)
.set({
...(deploymentVersionId ? { deploymentVersionId } : {}),
providerConfig: updatedConfig,
isActive: true,
updatedAt: new Date(),
})
.where(eq(webhook.id, existingWebhook.id))
result.webhooks.push({
id: existingWebhook.id,
credentialId: cred.credentialId,
isNew: false,
})
result.updated++
syncLogger.debug(
`[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}`
)
} else {
// Create new webhook for this credential
const webhookId = nanoid()
const webhookPath = useUniquePaths
? `${basePath}-${cred.credentialId.slice(0, 8)}`
: basePath
const newConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
userId: cred.userId,
}
await dbCtx.insert(webhook).values({
id: webhookId,
workflowId,
blockId,
path: webhookPath,
provider,
providerConfig: newConfig,
credentialSetId, // Indexed column for efficient credential set queries
isActive: true,
...(deploymentVersionId ? { deploymentVersionId } : {}),
createdAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, existingWebhook.id))
result.webhooks.push({
id: existingWebhook.id,
credentialId: cred.credentialId,
isNew: false,
})
result.updated++
result.webhooks.push({
id: webhookId,
credentialId: cred.credentialId,
isNew: true,
})
result.created++
syncLogger.debug(
`[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}`
)
} else {
// Create new webhook for this credential
const webhookId = nanoid()
const webhookPath = useUniquePaths ? `${basePath}-${cred.credentialId.slice(0, 8)}` : basePath
const newConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
userId: cred.userId,
syncLogger.debug(
`[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}`
)
}
await dbCtx.insert(webhook).values({
id: webhookId,
workflowId,
blockId,
path: webhookPath,
provider,
providerConfig: newConfig,
credentialSetId, // Indexed column for efficient credential set queries
isActive: true,
...(deploymentVersionId ? { deploymentVersionId } : {}),
createdAt: new Date(),
updatedAt: new Date(),
})
result.webhooks.push({
id: webhookId,
credentialId: cred.credentialId,
isNew: true,
})
result.created++
syncLogger.debug(
`[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}`
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
syncLogger.error(
`[${requestId}] Failed to sync webhook for credential ${cred.credentialId}: ${errorMessage}`
)
result.failed.push({
credentialId: cred.credentialId,
error: errorMessage,
})
}
}
// Delete webhooks for credentials no longer in the set
for (const [credentialId, existingWebhook] of existingByCredentialId) {
if (!credentialIdsInSet.has(credentialId)) {
await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id))
result.deleted++
try {
await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id))
result.deleted++
syncLogger.debug(
`[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}`
)
syncLogger.debug(
`[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}`
)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
syncLogger.error(
`[${requestId}] Failed to delete webhook ${existingWebhook.id} for credential ${credentialId}: ${errorMessage}`
)
result.failed.push({
credentialId,
error: `Failed to delete: ${errorMessage}`,
})
}
}
}
syncLogger.info(
`[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted`
`[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted, ${result.failed.length} failed`
)
return result
@@ -2175,8 +2190,6 @@ export async function syncAllWebhooksForCredentialSet(
const syncLogger = createLogger('CredentialSetMembershipSync')
syncLogger.info(`[${requestId}] Syncing all webhooks for credential set ${credentialSetId}`)
const { getProviderIdFromServiceId } = await import('@/lib/oauth')
// Find all webhooks that use this credential set using the indexed column
const webhooksForSet = await dbCtx
.select({ webhook })

View File

@@ -1,5 +1,5 @@
/**
* Pure utility functions for TwiML processing
* Pure utility functions for webhook processing
* This file has NO server-side dependencies to ensure it can be safely imported in client-side code
*/
@@ -18,3 +18,19 @@ export function convertSquareBracketsToTwiML(twiml: string | undefined): string
// Replace [Tag] with <Tag> and [/Tag] with </Tag>
return twiml.replace(/\[(\/?[^\]]+)\]/g, '<$1>')
}
/**
* Merges fields from source into target, but only if they don't exist in the base config.
* Used to preserve system-managed fields while respecting user-provided values.
*/
export function mergeNonUserFields(
target: Record<string, unknown>,
source: Record<string, unknown>,
userProvided: Record<string, unknown>
): void {
for (const [key, value] of Object.entries(source)) {
if (!(key in userProvided)) {
target[key] = value
}
}
}

View File

@@ -567,6 +567,32 @@ export class PauseResumeManager {
stateCopy.blockStates[stateBlockKey] = pauseBlockState
// Update the block log entry with the merged output so logs show the submission data
if (Array.isArray(stateCopy.blockLogs)) {
const blockLogIndex = stateCopy.blockLogs.findIndex(
(log: { blockId: string }) =>
log.blockId === stateBlockKey ||
log.blockId === pauseBlockId ||
log.blockId === contextId
)
if (blockLogIndex !== -1) {
// Filter output for logging (exclude internal fields and response)
const filteredOutput: Record<string, unknown> = {}
for (const [key, value] of Object.entries(mergedOutput)) {
if (key.startsWith('_')) continue
if (key === 'response') continue
filteredOutput[key] = value
}
stateCopy.blockLogs[blockLogIndex] = {
...stateCopy.blockLogs[blockLogIndex],
blockId: stateBlockKey,
output: filteredOutput,
durationMs: pauseDurationMs,
endedAt: new Date().toISOString(),
}
}
}
if (Array.isArray(stateCopy.executedBlocks)) {
const filtered = stateCopy.executedBlocks.filter(
(id: string) => id !== pauseBlockId && id !== contextId

View File

@@ -17,6 +17,9 @@ const {
mockValidateCronExpression,
mockGetScheduleTimeValues,
mockRandomUUID,
mockTransaction,
mockSelect,
mockFrom,
} = vi.hoisted(() => ({
mockInsert: vi.fn(),
mockDelete: vi.fn(),
@@ -28,20 +31,27 @@ const {
mockValidateCronExpression: vi.fn(),
mockGetScheduleTimeValues: vi.fn(),
mockRandomUUID: vi.fn(),
mockTransaction: vi.fn(),
mockSelect: vi.fn(),
mockFrom: vi.fn(),
}))
vi.mock('@sim/db', () => ({
db: {},
db: {
transaction: mockTransaction,
},
workflowSchedule: {
workflowId: 'workflow_id',
blockId: 'block_id',
deploymentVersionId: 'deployment_version_id',
id: 'id',
},
}))
vi.mock('drizzle-orm', () => ({
eq: vi.fn((...args) => ({ type: 'eq', args })),
and: vi.fn((...args) => ({ type: 'and', args })),
inArray: vi.fn((...args) => ({ type: 'inArray', args })),
sql: vi.fn((strings, ...values) => ({ type: 'sql', strings, values })),
}))
@@ -100,6 +110,20 @@ describe('Schedule Deploy Utilities', () => {
// Setup mock chain for delete
mockWhere.mockResolvedValue({})
mockDelete.mockReturnValue({ where: mockWhere })
// Setup mock chain for select
mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue([]) })
mockSelect.mockReturnValue({ from: mockFrom })
// Setup transaction mock to execute callback with mock tx
mockTransaction.mockImplementation(async (callback) => {
const mockTx = {
insert: mockInsert,
delete: mockDelete,
select: mockSelect,
}
return callback(mockTx)
})
})
afterEach(() => {
@@ -135,6 +159,19 @@ describe('Schedule Deploy Utilities', () => {
const result = findScheduleBlocks({})
expect(result).toHaveLength(0)
})
it('should exclude disabled schedule blocks', () => {
const blocks: Record<string, BlockState> = {
'block-1': { id: 'block-1', type: 'schedule', enabled: true, subBlocks: {} } as BlockState,
'block-2': { id: 'block-2', type: 'schedule', enabled: false, subBlocks: {} } as BlockState,
'block-3': { id: 'block-3', type: 'schedule', subBlocks: {} } as BlockState, // enabled undefined = enabled
}
const result = findScheduleBlocks(blocks)
expect(result).toHaveLength(2)
expect(result.map((b) => b.id)).toEqual(['block-1', 'block-3'])
})
})
describe('validateScheduleBlock', () => {
@@ -671,20 +708,24 @@ describe('Schedule Deploy Utilities', () => {
})
describe('createSchedulesForDeploy', () => {
const setupMockTransaction = (
existingSchedules: Array<{ id: string; blockId: string }> = []
) => {
mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue(existingSchedules) })
mockSelect.mockReturnValue({ from: mockFrom })
}
it('should return success with no schedule blocks', async () => {
const blocks: Record<string, BlockState> = {
'block-1': { id: 'block-1', type: 'agent', subBlocks: {} } as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(true)
expect(mockInsert).not.toHaveBeenCalled()
expect(mockTransaction).not.toHaveBeenCalled()
})
it('should create schedule for valid schedule block', async () => {
@@ -700,17 +741,15 @@ describe('Schedule Deploy Utilities', () => {
} as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(true)
expect(result.scheduleId).toBe('test-uuid')
expect(result.cronExpression).toBe('0 9 * * *')
expect(result.nextRunAt).toEqual(new Date('2025-04-15T09:00:00Z'))
expect(mockTransaction).toHaveBeenCalled()
expect(mockInsert).toHaveBeenCalled()
expect(mockOnConflictDoUpdate).toHaveBeenCalled()
})
@@ -727,16 +766,13 @@ describe('Schedule Deploy Utilities', () => {
} as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(false)
expect(result.error).toBe('Time is required for daily schedules')
expect(mockInsert).not.toHaveBeenCalled()
expect(mockTransaction).not.toHaveBeenCalled()
})
it('should use onConflictDoUpdate for existing schedules', async () => {
@@ -752,12 +788,9 @@ describe('Schedule Deploy Utilities', () => {
} as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith({
target: expect.any(Array),
@@ -769,6 +802,27 @@ describe('Schedule Deploy Utilities', () => {
}),
})
})
it('should rollback on database error', async () => {
const blocks: Record<string, BlockState> = {
'block-1': {
id: 'block-1',
type: 'schedule',
subBlocks: {
scheduleType: { value: 'daily' },
dailyTime: { value: '09:00' },
timezone: { value: 'UTC' },
},
} as BlockState,
}
mockTransaction.mockRejectedValueOnce(new Error('Database error'))
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(false)
expect(result.error).toBe('Database error')
})
})
describe('deleteSchedulesForWorkflow', () => {

View File

@@ -1,6 +1,6 @@
import { db, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { and, eq, inArray } from 'drizzle-orm'
import type { DbOrTx } from '@/lib/db/types'
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
import type { BlockState } from '@/lib/workflows/schedules/utils'
@@ -21,13 +21,13 @@ export interface ScheduleDeployResult {
}
/**
* Create or update schedule records for a workflow during deployment
* This should be called within a database transaction
* Create or update schedule records for a workflow during deployment.
* Uses a transaction to ensure atomicity - all schedules are created or none are.
*/
export async function createSchedulesForDeploy(
workflowId: string,
blocks: Record<string, BlockState>,
tx: DbOrTx,
_tx: DbOrTx,
deploymentVersionId?: string
): Promise<ScheduleDeployResult> {
const scheduleBlocks = findScheduleBlocks(blocks)
@@ -37,16 +37,16 @@ export async function createSchedulesForDeploy(
return { success: true }
}
let lastScheduleInfo: {
scheduleId: string
cronExpression?: string
nextRunAt?: Date
timezone?: string
} | null = null
// Phase 1: Validate ALL blocks before making any DB changes
const validatedBlocks: Array<{
blockId: string
cronExpression: string
nextRunAt: Date
timezone: string
}> = []
for (const block of scheduleBlocks) {
const blockId = block.id as string
const validation = validateScheduleBlock(block)
if (!validation.isValid) {
return {
@@ -54,62 +54,112 @@ export async function createSchedulesForDeploy(
error: validation.error,
}
}
const { cronExpression, nextRunAt, timezone } = validation
const scheduleId = crypto.randomUUID()
const now = new Date()
const values = {
id: scheduleId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
validatedBlocks.push({
blockId,
cronExpression: cronExpression!,
triggerType: 'schedule',
createdAt: now,
updatedAt: now,
nextRunAt: nextRunAt!,
timezone: timezone!,
status: 'active',
failedCount: 0,
}
const setValues = {
blockId,
cronExpression: cronExpression!,
...(deploymentVersionId ? { deploymentVersionId } : {}),
updatedAt: now,
nextRunAt: nextRunAt!,
timezone: timezone!,
status: 'active',
failedCount: 0,
}
await tx
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [
workflowSchedule.workflowId,
workflowSchedule.blockId,
workflowSchedule.deploymentVersionId,
],
set: setValues,
})
logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, {
scheduleId: values.id,
cronExpression,
nextRunAt: nextRunAt?.toISOString(),
cronExpression: validation.cronExpression!,
nextRunAt: validation.nextRunAt!,
timezone: validation.timezone!,
})
}
lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone }
// Phase 2: All validations passed - now do DB operations in a transaction
let lastScheduleInfo: {
scheduleId: string
cronExpression?: string
nextRunAt?: Date
timezone?: string
} | null = null
try {
await db.transaction(async (tx) => {
const currentBlockIds = new Set(validatedBlocks.map((b) => b.blockId))
const existingSchedules = await tx
.select({ id: workflowSchedule.id, blockId: workflowSchedule.blockId })
.from(workflowSchedule)
.where(
deploymentVersionId
? and(
eq(workflowSchedule.workflowId, workflowId),
eq(workflowSchedule.deploymentVersionId, deploymentVersionId)
)
: eq(workflowSchedule.workflowId, workflowId)
)
const orphanedScheduleIds = existingSchedules
.filter((s) => s.blockId && !currentBlockIds.has(s.blockId))
.map((s) => s.id)
if (orphanedScheduleIds.length > 0) {
logger.info(
`Deleting ${orphanedScheduleIds.length} orphaned schedule(s) for workflow ${workflowId}`
)
await tx.delete(workflowSchedule).where(inArray(workflowSchedule.id, orphanedScheduleIds))
}
for (const validated of validatedBlocks) {
const { blockId, cronExpression, nextRunAt, timezone } = validated
const scheduleId = crypto.randomUUID()
const now = new Date()
const values = {
id: scheduleId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId,
cronExpression,
triggerType: 'schedule',
createdAt: now,
updatedAt: now,
nextRunAt,
timezone,
status: 'active',
failedCount: 0,
}
const setValues = {
blockId,
cronExpression,
...(deploymentVersionId ? { deploymentVersionId } : {}),
updatedAt: now,
nextRunAt,
timezone,
status: 'active',
failedCount: 0,
}
await tx
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [
workflowSchedule.workflowId,
workflowSchedule.blockId,
workflowSchedule.deploymentVersionId,
],
set: setValues,
})
logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, {
scheduleId: values.id,
cronExpression,
nextRunAt: nextRunAt?.toISOString(),
})
lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone }
}
})
} catch (error) {
logger.error(`Failed to create schedules for workflow ${workflowId}`, error)
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to create schedules',
}
}
return {
success: true,
...lastScheduleInfo,
...(lastScheduleInfo ?? {}),
}
}
@@ -145,8 +195,25 @@ export async function cleanupDeploymentVersion(params: {
workflow: Record<string, unknown>
requestId: string
deploymentVersionId: string
/**
* If true, skip external subscription cleanup (already done by saveTriggerWebhooksForDeploy).
* Only deletes DB records.
*/
skipExternalCleanup?: boolean
}): Promise<void> {
const { workflowId, workflow, requestId, deploymentVersionId } = params
await cleanupWebhooksForWorkflow(workflowId, workflow, requestId, deploymentVersionId)
const {
workflowId,
workflow,
requestId,
deploymentVersionId,
skipExternalCleanup = false,
} = params
await cleanupWebhooksForWorkflow(
workflowId,
workflow,
requestId,
deploymentVersionId,
skipExternalCleanup
)
await deleteSchedulesForWorkflow(workflowId, db, deploymentVersionId)
}

View File

@@ -98,9 +98,12 @@ function getMissingConfigError(scheduleType: string): string {
/**
* Find schedule blocks in a workflow's blocks
* Only returns enabled schedule blocks (disabled blocks are skipped)
*/
export function findScheduleBlocks(blocks: Record<string, BlockState>): BlockState[] {
return Object.values(blocks).filter((block) => block.type === 'schedule')
return Object.values(blocks).filter(
(block) => block.type === 'schedule' && block.enabled !== false
)
}
/**

View File

@@ -4,7 +4,6 @@ import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import type { SubBlockConfig } from '@/blocks/types'
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
@@ -1171,93 +1170,6 @@ export const useWorkflowStore = create<WorkflowStore>()(
// Note: Socket.IO handles real-time sync automatically
},
toggleBlockTriggerMode: (id: string) => {
const block = get().blocks[id]
if (!block) return
const newTriggerMode = !block.triggerMode
// When switching TO trigger mode, check if block is inside a subflow
if (newTriggerMode && TriggerUtils.isBlockInSubflow(id, get().blocks)) {
logger.warn('Cannot enable trigger mode for block inside loop or parallel subflow', {
blockId: id,
blockType: block.type,
})
return
}
// When switching TO trigger mode, remove all incoming connections
let filteredEdges = [...get().edges]
if (newTriggerMode) {
// Remove edges where this block is the target
filteredEdges = filteredEdges.filter((edge) => edge.target !== id)
logger.info(
`Removed ${get().edges.length - filteredEdges.length} incoming connections for trigger mode`,
{
blockId: id,
blockType: block.type,
}
)
}
const newState = {
blocks: {
...get().blocks,
[id]: {
...block,
triggerMode: newTriggerMode,
},
},
edges: filteredEdges,
loops: { ...get().loops },
parallels: { ...get().parallels },
}
set(newState)
get().updateLastSaved()
// Handle webhook enable/disable when toggling trigger mode
const handleWebhookToggle = async () => {
try {
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!activeWorkflowId) return
// Check if there's a webhook for this block
const response = await fetch(
`/api/webhooks?workflowId=${activeWorkflowId}&blockId=${id}`
)
if (response.ok) {
const data = await response.json()
if (data.webhooks && data.webhooks.length > 0) {
const webhook = data.webhooks[0].webhook
// Update webhook's isActive status based on trigger mode
const updateResponse = await fetch(`/api/webhooks/${webhook.id}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
isActive: newTriggerMode,
}),
})
if (!updateResponse.ok) {
logger.error('Failed to update webhook status')
}
}
}
} catch (error) {
logger.error('Error toggling webhook status:', error)
}
}
// Handle webhook toggle asynchronously
handleWebhookToggle()
// Note: Socket.IO handles real-time sync automatically
},
// Parallel block methods implementation
updateParallelCount: (parallelId: string, count: number) => {
const block = get().blocks[parallelId]

View File

@@ -241,7 +241,6 @@ export interface WorkflowActions {
setNeedsRedeploymentFlag: (needsRedeployment: boolean) => void
revertToDeployedState: (deployedState: WorkflowState) => void
toggleBlockAdvancedMode: (id: string) => void
toggleBlockTriggerMode: (id: string) => void
setDragStartPosition: (position: DragStartPosition | null) => void
getDragStartPosition: () => DragStartPosition | null
getWorkflowState: () => WorkflowState