Compare commits

..

4 Commits

Author SHA1 Message Date
Vikhyath Mondreti
211a7ac3a4 fix(child-workflow): nested spans handoff (#2966)
* fix(child-workflow): nested spans handoff

* remove overly defensive programming

* update type check

* type more code

* remove more dead code

* address bugbot comments
2026-01-24 02:39:13 -08:00
Emir Karabeg
0f9b6ad1d2 fix(preview): subblock values (#2969) 2026-01-24 02:32:08 -08:00
Vikhyath Mondreti
12100e6881 improvement(webhooks): remove dead code (#2965)
* fix(webhooks): subscription recreation path

* improvement(webhooks): remove dead code

* fix tests

* address bugbot comments

* fix restoration edge case

* fix more edge cases

* address bugbot comments

* fix gmail polling

* add warnings for UI indication for credential sets
2026-01-23 23:18:20 -08:00
Siddharth Ganesan
23294683e1 fix(copilot): mask credentials fix (#2963)
* Fix copilot masking

* Clean up

* Lint
2026-01-23 19:34:55 -08:00
38 changed files with 1671 additions and 953 deletions

View File

@@ -640,6 +640,7 @@ export interface AdminDeployResult {
isDeployed: boolean
version: number
deployedAt: string
warnings?: string[]
}
export interface AdminUndeployResult {

View File

@@ -1,14 +1,23 @@
import { db, workflow } from '@sim/db'
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { generateRequestId } from '@/lib/core/utils/request'
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import {
cleanupWebhooksForWorkflow,
restorePreviousVersionWebhooks,
saveTriggerWebhooksForDeploy,
} from '@/lib/webhooks/deploy'
import {
deployWorkflow,
loadWorkflowFromNormalizedTables,
undeployWorkflow,
} from '@/lib/workflows/persistence/utils'
import { createSchedulesForDeploy, validateWorkflowSchedules } from '@/lib/workflows/schedules'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
import {
badRequestResponse,
@@ -28,10 +37,11 @@ interface RouteParams {
export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
const { id: workflowId } = await context.params
const requestId = generateRequestId()
try {
const [workflowRecord] = await db
.select({ id: workflow.id, name: workflow.name })
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
@@ -50,6 +60,18 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployResult = await deployWorkflow({
workflowId,
deployedBy: ADMIN_ACTOR_ID,
@@ -65,6 +87,32 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return internalErrorResponse('Failed to resolve deployment version')
}
const workflowData = workflowRecord as Record<string, unknown>
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId,
workflow: workflowData,
userId: workflowRecord.userId,
blocks: normalizedData.blocks,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
previousVersionId,
})
if (!triggerSaveResult.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
})
await undeployWorkflow({ workflowId })
return internalErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
)
}
const scheduleResult = await createSchedulesForDeploy(
workflowId,
normalizedData.blocks,
@@ -72,15 +120,58 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
deployResult.deploymentVersionId
)
if (!scheduleResult.success) {
logger.warn(`Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`)
logger.error(
`[${requestId}] Admin API: Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`
)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
await undeployWorkflow({ workflowId })
return internalErrorResponse(scheduleResult.error || 'Failed to create schedule')
}
logger.info(`Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`)
if (previousVersionId && previousVersionId !== deployResult.deploymentVersionId) {
try {
logger.info(`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId}`)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
} catch (cleanupError) {
logger.error(
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
logger.info(
`[${requestId}] Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`
)
// Sync MCP tools with the latest parameter schema
await syncMcpToolsForWorkflow({ workflowId, requestId, context: 'deploy' })
const response: AdminDeployResult = {
isDeployed: true,
version: deployResult.version!,
deployedAt: deployResult.deployedAt!.toISOString(),
warnings: triggerSaveResult.warnings,
}
return singleResponse(response)
@@ -105,7 +196,6 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
return notFoundResponse('Workflow')
}
// Clean up external webhook subscriptions before undeploying
await cleanupWebhooksForWorkflow(
workflowId,
workflowRecord as Record<string, unknown>,
@@ -117,6 +207,8 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
return internalErrorResponse(result.error || 'Failed to undeploy workflow')
}
await removeMcpToolsForWorkflow(workflowId, requestId)
logger.info(`Admin API: Undeployed workflow ${workflowId}`)
const response: AdminUndeployResult = {

View File

@@ -1,7 +1,15 @@
import { db, workflow } from '@sim/db'
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
import {
badRequestResponse,
@@ -9,6 +17,7 @@ import {
notFoundResponse,
singleResponse,
} from '@/app/api/v1/admin/responses'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('AdminWorkflowActivateVersionAPI')
@@ -18,11 +27,12 @@ interface RouteParams {
}
export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
const requestId = generateRequestId()
const { id: workflowId, versionId } = await context.params
try {
const [workflowRecord] = await db
.select({ id: workflow.id })
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
@@ -36,23 +46,161 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return badRequestResponse('Invalid version number')
}
const [versionRow] = await db
.select({
id: workflowDeploymentVersion.id,
state: workflowDeploymentVersion.state,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.limit(1)
if (!versionRow?.state) {
return notFoundResponse('Deployment version')
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
const blocks = deployedState.blocks
if (!blocks || typeof blocks !== 'object') {
return internalErrorResponse('Invalid deployed state structure')
}
const workflowData = workflowRecord as Record<string, unknown>
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId,
workflow: workflowData,
userId: workflowRecord.userId,
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})
if (!triggerSaveResult.success) {
logger.error(
`[${requestId}] Admin API: Failed to sync triggers for workflow ${workflowId}`,
triggerSaveResult.error
)
return internalErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
)
}
const scheduleResult = await createSchedulesForDeploy(workflowId, blocks, db, versionRow.id)
if (!scheduleResult.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
return internalErrorResponse(scheduleResult.error || 'Failed to sync schedules')
}
const result = await activateWorkflowVersion({ workflowId, version: versionNum })
if (!result.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
if (result.error === 'Deployment version not found') {
return notFoundResponse('Deployment version')
}
return internalErrorResponse(result.error || 'Failed to activate version')
}
logger.info(`Admin API: Activated version ${versionNum} for workflow ${workflowId}`)
if (previousVersionId && previousVersionId !== versionRow.id) {
try {
logger.info(
`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId} webhooks/schedules`
)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Admin API: Previous version cleanup completed`)
} catch (cleanupError) {
logger.error(
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
await syncMcpToolsForWorkflow({
workflowId,
requestId,
state: versionRow.state,
context: 'activate',
})
logger.info(
`[${requestId}] Admin API: Activated version ${versionNum} for workflow ${workflowId}`
)
return singleResponse({
success: true,
version: versionNum,
deployedAt: result.deployedAt!.toISOString(),
warnings: triggerSaveResult.warnings,
})
} catch (error) {
logger.error(`Admin API: Failed to activate version for workflow ${workflowId}`, { error })
logger.error(
`[${requestId}] Admin API: Failed to activate version for workflow ${workflowId}`,
{
error,
}
)
return internalErrorResponse('Failed to activate deployment version')
}
})

View File

@@ -7,12 +7,7 @@ import { getSession } from '@/lib/auth'
import { validateInteger } from '@/lib/core/security/input-validation'
import { PlatformEvents } from '@/lib/core/telemetry'
import { generateRequestId } from '@/lib/core/utils/request'
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
import {
cleanupExternalWebhook,
createExternalWebhookSubscription,
shouldRecreateExternalWebhookSubscription,
} from '@/lib/webhooks/provider-subscriptions'
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('WebhookAPI')
@@ -88,7 +83,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
}
}
// Update a webhook
export async function PATCH(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
@@ -103,7 +97,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
}
const body = await request.json()
const { path, provider, providerConfig, isActive, failedCount } = body
const { isActive, failedCount } = body
if (failedCount !== undefined) {
const validation = validateInteger(failedCount, 'failedCount', { min: 0 })
@@ -113,28 +107,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
}
}
const originalProviderConfig = providerConfig
let resolvedProviderConfig = providerConfig
if (providerConfig) {
const webhookDataForResolve = await db
.select({
workspaceId: workflow.workspaceId,
})
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.where(eq(webhook.id, id))
.limit(1)
if (webhookDataForResolve.length > 0) {
resolvedProviderConfig = await resolveEnvVarsInObject(
providerConfig,
session.user.id,
webhookDataForResolve[0].workspaceId || undefined
)
}
}
// Find the webhook and check permissions
const webhooks = await db
.select({
webhook: webhook,
@@ -155,16 +127,12 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
}
const webhookData = webhooks[0]
// Check if user has permission to modify this webhook
let canModify = false
// Case 1: User owns the workflow
if (webhookData.workflow.userId === session.user.id) {
canModify = true
}
// Case 2: Workflow belongs to a workspace and user has write or admin permission
if (!canModify && webhookData.workflow.workspaceId) {
const userPermission = await getUserEntityPermissions(
session.user.id,
@@ -183,80 +151,14 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
const existingProviderConfig =
(webhookData.webhook.providerConfig as Record<string, unknown>) || {}
let nextProviderConfig =
providerConfig !== undefined &&
resolvedProviderConfig &&
typeof resolvedProviderConfig === 'object'
? (resolvedProviderConfig as Record<string, unknown>)
: existingProviderConfig
const nextProvider = (provider ?? webhookData.webhook.provider) as string
if (
providerConfig !== undefined &&
shouldRecreateExternalWebhookSubscription({
previousProvider: webhookData.webhook.provider as string,
nextProvider,
previousConfig: existingProviderConfig,
nextConfig: nextProviderConfig,
})
) {
await cleanupExternalWebhook(
{ ...webhookData.webhook, providerConfig: existingProviderConfig },
webhookData.workflow,
requestId
)
const result = await createExternalWebhookSubscription(
request,
{
...webhookData.webhook,
provider: nextProvider,
providerConfig: nextProviderConfig,
},
webhookData.workflow,
session.user.id,
requestId
)
nextProviderConfig = result.updatedProviderConfig as Record<string, unknown>
}
logger.debug(`[${requestId}] Updating webhook properties`, {
hasPathUpdate: path !== undefined,
hasProviderUpdate: provider !== undefined,
hasConfigUpdate: providerConfig !== undefined,
hasActiveUpdate: isActive !== undefined,
hasFailedCountUpdate: failedCount !== undefined,
})
let finalProviderConfig = webhooks[0].webhook.providerConfig
if (providerConfig !== undefined && originalProviderConfig) {
const existingConfig = existingProviderConfig
finalProviderConfig = {
...originalProviderConfig,
credentialId: existingConfig.credentialId,
credentialSetId: existingConfig.credentialSetId,
userId: existingConfig.userId,
historyId: existingConfig.historyId,
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
setupCompleted: existingConfig.setupCompleted,
externalId: existingConfig.externalId,
}
for (const [key, value] of Object.entries(nextProviderConfig)) {
if (!(key in originalProviderConfig)) {
;(finalProviderConfig as Record<string, unknown>)[key] = value
}
}
}
const updatedWebhook = await db
.update(webhook)
.set({
path: path !== undefined ? path : webhooks[0].webhook.path,
provider: provider !== undefined ? provider : webhooks[0].webhook.provider,
providerConfig: finalProviderConfig,
isActive: isActive !== undefined ? isActive : webhooks[0].webhook.isActive,
failedCount: failedCount !== undefined ? failedCount : webhooks[0].webhook.failedCount,
updatedAt: new Date(),
@@ -339,11 +241,8 @@ export async function DELETE(
}
const foundWebhook = webhookData.webhook
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
const providerConfig = foundWebhook.providerConfig as Record<string, unknown> | null
const credentialSetId = providerConfig?.credentialSetId as string | undefined
const blockId = providerConfig?.blockId as string | undefined
const credentialSetId = foundWebhook.credentialSetId as string | undefined
const blockId = foundWebhook.blockId as string | undefined
if (credentialSetId && blockId) {
const allCredentialSetWebhooks = await db
@@ -351,10 +250,9 @@ export async function DELETE(
.from(webhook)
.where(and(eq(webhook.workflowId, webhookData.workflow.id), eq(webhook.blockId, blockId)))
const webhooksToDelete = allCredentialSetWebhooks.filter((w) => {
const config = w.providerConfig as Record<string, unknown> | null
return config?.credentialSetId === credentialSetId
})
const webhooksToDelete = allCredentialSetWebhooks.filter(
(w) => w.credentialSetId === credentialSetId
)
for (const w of webhooksToDelete) {
await cleanupExternalWebhook(w, webhookData.workflow, requestId)

View File

@@ -7,9 +7,21 @@ import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { PlatformEvents } from '@/lib/core/telemetry'
import { generateRequestId } from '@/lib/core/utils/request'
import { getProviderIdFromServiceId } from '@/lib/oauth'
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
import { createExternalWebhookSubscription } from '@/lib/webhooks/provider-subscriptions'
import {
cleanupExternalWebhook,
createExternalWebhookSubscription,
} from '@/lib/webhooks/provider-subscriptions'
import { mergeNonUserFields } from '@/lib/webhooks/utils'
import {
configureGmailPolling,
configureOutlookPolling,
configureRssPolling,
syncWebhooksForCredentialSet,
} from '@/lib/webhooks/utils.server'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { extractCredentialSetId, isCredentialSetValue } from '@/executor/constants'
const logger = createLogger('WebhooksAPI')
@@ -316,8 +328,6 @@ export async function POST(request: NextRequest) {
const directCredentialSetId = resolvedProviderConfig?.credentialSetId as string | undefined
if (directCredentialSetId || rawCredentialId) {
const { isCredentialSetValue, extractCredentialSetId } = await import('@/executor/constants')
const credentialSetId =
directCredentialSetId ||
(rawCredentialId && isCredentialSetValue(rawCredentialId)
@@ -329,11 +339,6 @@ export async function POST(request: NextRequest) {
`[${requestId}] Credential set detected for ${provider} trigger. Syncing webhooks for set ${credentialSetId}`
)
const { getProviderIdFromServiceId } = await import('@/lib/oauth')
const { syncWebhooksForCredentialSet, configureGmailPolling, configureOutlookPolling } =
await import('@/lib/webhooks/utils.server')
// Map provider to OAuth provider ID
const oauthProviderId = getProviderIdFromServiceId(provider)
const {
@@ -466,7 +471,8 @@ export async function POST(request: NextRequest) {
providerConfig: providerConfigOverride,
})
const configToSave = { ...originalProviderConfig }
const userProvided = originalProviderConfig as Record<string, unknown>
const configToSave: Record<string, unknown> = { ...userProvided }
try {
const result = await createExternalWebhookSubscription(
@@ -477,11 +483,7 @@ export async function POST(request: NextRequest) {
requestId
)
const updatedConfig = result.updatedProviderConfig as Record<string, unknown>
for (const [key, value] of Object.entries(updatedConfig)) {
if (!(key in originalProviderConfig)) {
configToSave[key] = value
}
}
mergeNonUserFields(configToSave, updatedConfig, userProvided)
resolvedProviderConfig = updatedConfig
externalSubscriptionCreated = result.externalSubscriptionCreated
} catch (err) {
@@ -547,7 +549,6 @@ export async function POST(request: NextRequest) {
if (externalSubscriptionCreated) {
logger.error(`[${requestId}] DB save failed, cleaning up external subscription`, dbError)
try {
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
await cleanupExternalWebhook(
createTempWebhookData(configToSave),
workflowRecord,
@@ -567,7 +568,6 @@ export async function POST(request: NextRequest) {
if (savedWebhook && provider === 'gmail') {
logger.info(`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`)
try {
const { configureGmailPolling } = await import('@/lib/webhooks/utils.server')
const success = await configureGmailPolling(savedWebhook, requestId)
if (!success) {
@@ -606,7 +606,6 @@ export async function POST(request: NextRequest) {
`[${requestId}] Outlook provider detected. Setting up Outlook webhook configuration.`
)
try {
const { configureOutlookPolling } = await import('@/lib/webhooks/utils.server')
const success = await configureOutlookPolling(savedWebhook, requestId)
if (!success) {
@@ -643,7 +642,6 @@ export async function POST(request: NextRequest) {
if (savedWebhook && provider === 'rss') {
logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`)
try {
const { configureRssPolling } = await import('@/lib/webhooks/utils.server')
const success = await configureRssPolling(savedWebhook, requestId)
if (!success) {

View File

@@ -4,7 +4,11 @@ import { and, desc, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { cleanupWebhooksForWorkflow, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import {
cleanupWebhooksForWorkflow,
restorePreviousVersionWebhooks,
saveTriggerWebhooksForDeploy,
} from '@/lib/webhooks/deploy'
import {
deployWorkflow,
loadWorkflowFromNormalizedTables,
@@ -135,6 +139,18 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployResult = await deployWorkflow({
workflowId: id,
deployedBy: actorUserId,
@@ -161,6 +177,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
blocks: normalizedData.blocks,
requestId,
deploymentVersionId,
previousVersionId,
})
if (!triggerSaveResult.success) {
@@ -194,6 +211,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
requestId,
deploymentVersionId,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
await undeployWorkflow({ workflowId: id })
return createErrorResponse(scheduleResult.error || 'Failed to create schedule', 500)
}
@@ -208,6 +234,25 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
)
}
if (previousVersionId && previousVersionId !== deploymentVersionId) {
try {
logger.info(`[${requestId}] Cleaning up previous version ${previousVersionId} DB records`)
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
} catch (cleanupError) {
logger.error(
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
// Non-fatal - continue with success response
}
}
logger.info(`[${requestId}] Workflow deployed successfully: ${id}`)
// Sync MCP tools with the latest parameter schema
@@ -228,6 +273,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
nextRunAt: scheduleInfo.nextRunAt,
}
: undefined,
warnings: triggerSaveResult.warnings,
})
} catch (error: any) {
logger.error(`[${requestId}] Error deploying workflow: ${id}`, {

View File

@@ -4,7 +4,7 @@ import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
@@ -85,6 +85,11 @@ export async function POST(
return createErrorResponse('Invalid deployed state structure', 500)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
@@ -93,6 +98,8 @@ export async function POST(
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})
if (!triggerSaveResult.success) {
@@ -102,11 +109,6 @@ export async function POST(
)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
if (!scheduleResult.success) {
@@ -116,6 +118,15 @@ export async function POST(
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
}
@@ -127,6 +138,15 @@ export async function POST(
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
}
@@ -140,6 +160,7 @@ export async function POST(
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Previous version cleanup completed`)
} catch (cleanupError) {
@@ -157,7 +178,11 @@ export async function POST(
context: 'activate',
})
return createSuccessResponse({ success: true, deployedAt: result.deployedAt })
return createSuccessResponse({
success: true,
deployedAt: result.deployedAt,
warnings: triggerSaveResult.warnings,
})
} catch (error: any) {
logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error)
return createErrorResponse(error.message || 'Failed to activate deployment', 500)

View File

@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
return NextResponse.json(filteredResult)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
const executionResult = error.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
return NextResponse.json(
{
success: false,
output: executionResult?.output,
error: executionResult?.error || error.message || 'Execution failed',
error: executionResult?.error || errorMessage || 'Execution failed',
metadata: executionResult?.metadata
? {
duration: executionResult.metadata.duration,
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
const executionResult = error.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',

View File

@@ -95,6 +95,7 @@ export function DeployModal({
const [activeTab, setActiveTab] = useState<TabView>('general')
const [chatSubmitting, setChatSubmitting] = useState(false)
const [apiDeployError, setApiDeployError] = useState<string | null>(null)
const [apiDeployWarnings, setApiDeployWarnings] = useState<string[]>([])
const [isChatFormValid, setIsChatFormValid] = useState(false)
const [selectedStreamingOutputs, setSelectedStreamingOutputs] = useState<string[]>([])
@@ -227,6 +228,7 @@ export function DeployModal({
if (open && workflowId) {
setActiveTab('general')
setApiDeployError(null)
setApiDeployWarnings([])
}
}, [open, workflowId])
@@ -282,9 +284,13 @@ export function DeployModal({
if (!workflowId) return
setApiDeployError(null)
setApiDeployWarnings([])
try {
await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
if (result.warnings && result.warnings.length > 0) {
setApiDeployWarnings(result.warnings)
}
await refetchDeployedState()
} catch (error: unknown) {
logger.error('Error deploying workflow:', { error })
@@ -297,8 +303,13 @@ export function DeployModal({
async (version: number) => {
if (!workflowId) return
setApiDeployWarnings([])
try {
await activateVersionMutation.mutateAsync({ workflowId, version })
const result = await activateVersionMutation.mutateAsync({ workflowId, version })
if (result.warnings && result.warnings.length > 0) {
setApiDeployWarnings(result.warnings)
}
await refetchDeployedState()
} catch (error) {
logger.error('Error promoting version:', { error })
@@ -324,9 +335,13 @@ export function DeployModal({
if (!workflowId) return
setApiDeployError(null)
setApiDeployWarnings([])
try {
await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
if (result.warnings && result.warnings.length > 0) {
setApiDeployWarnings(result.warnings)
}
await refetchDeployedState()
} catch (error: unknown) {
logger.error('Error redeploying workflow:', { error })
@@ -338,6 +353,7 @@ export function DeployModal({
const handleCloseModal = useCallback(() => {
setChatSubmitting(false)
setApiDeployError(null)
setApiDeployWarnings([])
onOpenChange(false)
}, [onOpenChange])
@@ -479,6 +495,14 @@ export function DeployModal({
<div>{apiDeployError}</div>
</div>
)}
{apiDeployWarnings.length > 0 && (
<div className='mb-3 rounded-[4px] border border-amber-500/30 bg-amber-500/10 p-3 text-amber-700 dark:text-amber-400 text-sm'>
<div className='font-semibold'>Deployment Warning</div>
{apiDeployWarnings.map((warning, index) => (
<div key={index}>{warning}</div>
))}
</div>
)}
<ModalTabsContent value='general'>
<GeneralDeploy
workflowId={workflowId}

View File

@@ -16,6 +16,7 @@ import {
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
}
function isExecutionResult(value: unknown): value is ExecutionResult {
if (!isRecord(value)) return false
return typeof value.success === 'boolean' && isRecord(value.output)
}
function extractExecutionResult(error: unknown): ExecutionResult | null {
if (!isRecord(error)) return null
const candidate = error.executionResult
return isExecutionResult(candidate) ? candidate : null
}
export function useWorkflowExecution() {
const queryClient = useQueryClient()
const currentWorkflow = useCurrentWorkflow()
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
const normalizedMessage = normalizeErrorMessage(error)
const executionResultFromError = extractExecutionResult(error)
let errorResult: ExecutionResult
if (executionResultFromError) {
if (hasExecutionResult(error)) {
const executionResultFromError = error.executionResult
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
errorResult = {

View File

@@ -3,11 +3,26 @@
import { memo, useMemo } from 'react'
import { Handle, type NodeProps, Position } from 'reactflow'
import { HANDLE_POSITIONS } from '@/lib/workflows/blocks/block-dimensions'
import {
buildCanonicalIndex,
evaluateSubBlockCondition,
isSubBlockFeatureEnabled,
isSubBlockVisibleForMode,
} from '@/lib/workflows/subblocks/visibility'
import { getDisplayValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block'
import { getBlock } from '@/blocks'
import { SELECTOR_TYPES_HYDRATION_REQUIRED, type SubBlockConfig } from '@/blocks/types'
import { useVariablesStore } from '@/stores/panel/variables/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
/** Execution status for blocks in preview mode */
type ExecutionStatus = 'success' | 'error' | 'not-executed'
/** Subblock value structure matching workflow state */
interface SubBlockValueEntry {
value: unknown
}
interface WorkflowPreviewBlockData {
type: string
name: string
@@ -18,12 +33,220 @@ interface WorkflowPreviewBlockData {
isPreviewSelected?: boolean
/** Execution status for highlighting error/success states */
executionStatus?: ExecutionStatus
/** Subblock values from the workflow state */
subBlockValues?: Record<string, SubBlockValueEntry | unknown>
}
/**
* Extracts the raw value from a subblock value entry.
* Handles both wrapped ({ value: ... }) and unwrapped formats.
*/
function extractValue(entry: SubBlockValueEntry | unknown): unknown {
if (entry && typeof entry === 'object' && 'value' in entry) {
return (entry as SubBlockValueEntry).value
}
return entry
}
interface SubBlockRowProps {
title: string
value?: string
subBlock?: SubBlockConfig
rawValue?: unknown
}
/**
* Resolves dropdown/combobox value to its display label.
* Returns null if not a dropdown/combobox or no matching option found.
*/
function resolveDropdownLabel(
subBlock: SubBlockConfig | undefined,
rawValue: unknown
): string | null {
if (!subBlock || (subBlock.type !== 'dropdown' && subBlock.type !== 'combobox')) return null
if (!rawValue || typeof rawValue !== 'string') return null
const options = typeof subBlock.options === 'function' ? subBlock.options() : subBlock.options
if (!options) return null
const option = options.find((opt) =>
typeof opt === 'string' ? opt === rawValue : opt.id === rawValue
)
if (!option) return null
return typeof option === 'string' ? option : option.label
}
/**
* Resolves workflow ID to workflow name using the workflow registry.
* Uses synchronous store access to avoid hook dependencies.
*/
function resolveWorkflowName(
subBlock: SubBlockConfig | undefined,
rawValue: unknown
): string | null {
if (subBlock?.type !== 'workflow-selector') return null
if (!rawValue || typeof rawValue !== 'string') return null
const workflowMap = useWorkflowRegistry.getState().workflows
return workflowMap[rawValue]?.name ?? null
}
/**
* Type guard for variable assignments array
*/
function isVariableAssignmentsArray(
value: unknown
): value is Array<{ id?: string; variableId?: string; variableName?: string; value: unknown }> {
return (
Array.isArray(value) &&
value.length > 0 &&
value.every(
(item) =>
typeof item === 'object' &&
item !== null &&
('variableName' in item || 'variableId' in item)
)
)
}
/**
* Resolves variables-input to display names.
* Uses synchronous store access to avoid hook dependencies.
*/
function resolveVariablesDisplay(
subBlock: SubBlockConfig | undefined,
rawValue: unknown
): string | null {
if (subBlock?.type !== 'variables-input') return null
if (!isVariableAssignmentsArray(rawValue)) return null
const variables = useVariablesStore.getState().variables
const variablesArray = Object.values(variables)
const names = rawValue
.map((a) => {
if (a.variableId) {
const variable = variablesArray.find((v) => v.id === a.variableId)
return variable?.name
}
if (a.variableName) return a.variableName
return null
})
.filter((name): name is string => !!name)
if (names.length === 0) return null
if (names.length === 1) return names[0]
if (names.length === 2) return `${names[0]}, ${names[1]}`
return `${names[0]}, ${names[1]} +${names.length - 2}`
}
/**
* Resolves tool-input to display names.
* Resolves built-in tools from block registry (no API needed).
*/
function resolveToolsDisplay(
subBlock: SubBlockConfig | undefined,
rawValue: unknown
): string | null {
if (subBlock?.type !== 'tool-input') return null
if (!Array.isArray(rawValue) || rawValue.length === 0) return null
const toolNames = rawValue
.map((tool: unknown) => {
if (!tool || typeof tool !== 'object') return null
const t = tool as Record<string, unknown>
// Priority 1: Use tool.title if already populated
if (t.title && typeof t.title === 'string') return t.title
// Priority 2: Extract from inline schema (legacy format)
const schema = t.schema as Record<string, unknown> | undefined
if (schema?.function && typeof schema.function === 'object') {
const fn = schema.function as Record<string, unknown>
if (fn.name && typeof fn.name === 'string') return fn.name
}
// Priority 3: Extract from OpenAI function format
const fn = t.function as Record<string, unknown> | undefined
if (fn?.name && typeof fn.name === 'string') return fn.name
// Priority 4: Resolve built-in tool blocks from registry
if (
typeof t.type === 'string' &&
t.type !== 'custom-tool' &&
t.type !== 'mcp' &&
t.type !== 'workflow' &&
t.type !== 'workflow_input'
) {
const blockConfig = getBlock(t.type)
if (blockConfig?.name) return blockConfig.name
}
return null
})
.filter((name): name is string => !!name)
if (toolNames.length === 0) return null
if (toolNames.length === 1) return toolNames[0]
if (toolNames.length === 2) return `${toolNames[0]}, ${toolNames[1]}`
return `${toolNames[0]}, ${toolNames[1]} +${toolNames.length - 2}`
}
/**
* Renders a single subblock row with title and optional value.
* Matches the SubBlockRow component in WorkflowBlock.
* - Masks password fields with bullets
* - Resolves dropdown/combobox labels
* - Resolves workflow names from registry
* - Resolves variable names from store
* - Resolves tool names from block registry
* - Shows '-' for other selector types that need hydration
*/
function SubBlockRow({ title, value, subBlock, rawValue }: SubBlockRowProps) {
// Mask password fields
const isPasswordField = subBlock?.password === true
const maskedValue = isPasswordField && value && value !== '-' ? '•••' : null
// Resolve various display names (synchronous access, matching WorkflowBlock priority)
const dropdownLabel = resolveDropdownLabel(subBlock, rawValue)
const variablesDisplay = resolveVariablesDisplay(subBlock, rawValue)
const toolsDisplay = resolveToolsDisplay(subBlock, rawValue)
const workflowName = resolveWorkflowName(subBlock, rawValue)
// Check if this is a selector type that needs hydration (show '-' for raw IDs)
const isSelectorType = subBlock?.type && SELECTOR_TYPES_HYDRATION_REQUIRED.includes(subBlock.type)
// Compute final display value matching WorkflowBlock logic
// Priority order matches WorkflowBlock: masked > hydrated names > selector fallback > raw value
const hydratedName = dropdownLabel || variablesDisplay || toolsDisplay || workflowName
const displayValue = maskedValue || hydratedName || (isSelectorType && value ? '-' : value)
return (
<div className='flex items-center gap-[8px]'>
<span
className='min-w-0 truncate text-[14px] text-[var(--text-tertiary)] capitalize'
title={title}
>
{title}
</span>
{displayValue !== undefined && (
<span
className='flex-1 truncate text-right text-[14px] text-[var(--text-primary)]'
title={displayValue}
>
{displayValue}
</span>
)}
</div>
)
}
/**
* Preview block component for workflow visualization.
* Renders block header, subblocks skeleton, and handles without
* Renders block header, subblock values, and handles without
* hooks, store subscriptions, or interactive features.
* Matches the visual structure of WorkflowBlock exactly.
*/
function WorkflowPreviewBlockInner({ data }: NodeProps<WorkflowPreviewBlockData>) {
const {
@@ -34,21 +257,111 @@ function WorkflowPreviewBlockInner({ data }: NodeProps<WorkflowPreviewBlockData>
enabled = true,
isPreviewSelected = false,
executionStatus,
subBlockValues,
} = data
const blockConfig = getBlock(type)
const canonicalIndex = useMemo(
() => buildCanonicalIndex(blockConfig?.subBlocks || []),
[blockConfig?.subBlocks]
)
const rawValues = useMemo(() => {
if (!subBlockValues) return {}
return Object.entries(subBlockValues).reduce<Record<string, unknown>>((acc, [key, entry]) => {
acc[key] = extractValue(entry)
return acc
}, {})
}, [subBlockValues])
const visibleSubBlocks = useMemo(() => {
if (!blockConfig?.subBlocks) return []
const isStarterOrTrigger =
blockConfig.category === 'triggers' || type === 'starter' || isTrigger
return blockConfig.subBlocks.filter((subBlock) => {
if (subBlock.hidden) return false
if (subBlock.hideFromPreview) return false
if (subBlock.mode === 'trigger' && blockConfig.category !== 'triggers') return false
if (subBlock.mode === 'advanced') return false
return true
if (!isSubBlockFeatureEnabled(subBlock)) return false
// Handle trigger mode visibility
if (subBlock.mode === 'trigger' && !isStarterOrTrigger) return false
// Check advanced mode visibility
if (!isSubBlockVisibleForMode(subBlock, false, canonicalIndex, rawValues, undefined)) {
return false
}
// Check condition visibility
if (!subBlock.condition) return true
return evaluateSubBlockCondition(subBlock.condition, rawValues)
})
}, [blockConfig?.subBlocks])
}, [blockConfig?.subBlocks, blockConfig?.category, type, isTrigger, canonicalIndex, rawValues])
/**
* Compute condition rows for condition blocks
*/
const conditionRows = useMemo(() => {
if (type !== 'condition') return []
const conditionsValue = rawValues.conditions
const raw = typeof conditionsValue === 'string' ? conditionsValue : undefined
try {
if (raw) {
const parsed = JSON.parse(raw) as unknown
if (Array.isArray(parsed)) {
return parsed.map((item: unknown, index: number) => {
const conditionItem = item as { id?: string; value?: unknown }
const title = index === 0 ? 'if' : index === parsed.length - 1 ? 'else' : 'else if'
return {
id: conditionItem?.id ?? `cond-${index}`,
title,
value: typeof conditionItem?.value === 'string' ? conditionItem.value : '',
}
})
}
}
} catch {
// Failed to parse, use fallback
}
return [
{ id: 'if', title: 'if', value: '' },
{ id: 'else', title: 'else', value: '' },
]
}, [type, rawValues])
/**
* Compute router rows for router_v2 blocks
*/
const routerRows = useMemo(() => {
if (type !== 'router_v2') return []
const routesValue = rawValues.routes
const raw = typeof routesValue === 'string' ? routesValue : undefined
try {
if (raw) {
const parsed = JSON.parse(raw) as unknown
if (Array.isArray(parsed)) {
return parsed.map((item: unknown, index: number) => {
const routeItem = item as { id?: string; value?: string }
return {
id: routeItem?.id ?? `route${index + 1}`,
value: routeItem?.value ?? '',
}
})
}
}
} catch {
// Failed to parse, use fallback
}
return [{ id: 'route1', value: '' }]
}, [type, rawValues])
if (!blockConfig) {
return null
@@ -57,8 +370,14 @@ function WorkflowPreviewBlockInner({ data }: NodeProps<WorkflowPreviewBlockData>
const IconComponent = blockConfig.icon
const isStarterOrTrigger = blockConfig.category === 'triggers' || type === 'starter' || isTrigger
const shouldShowDefaultHandles = !isStarterOrTrigger
const hasSubBlocks = visibleSubBlocks.length > 0
const showErrorRow = !isStarterOrTrigger
const hasContentBelowHeader =
type === 'condition'
? conditionRows.length > 0 || shouldShowDefaultHandles
: type === 'router_v2'
? routerRows.length > 0 || shouldShowDefaultHandles
: hasSubBlocks || shouldShowDefaultHandles
const horizontalHandleClass = '!border-none !bg-[var(--surface-7)] !h-5 !w-[7px] !rounded-[2px]'
const verticalHandleClass = '!border-none !bg-[var(--surface-7)] !h-[7px] !w-5 !rounded-[2px]'
@@ -67,7 +386,7 @@ function WorkflowPreviewBlockInner({ data }: NodeProps<WorkflowPreviewBlockData>
const hasSuccess = executionStatus === 'success'
return (
<div className='relative w-[250px] select-none rounded-[8px] border border-[var(--border)] bg-[var(--surface-2)]'>
<div className='relative w-[250px] select-none rounded-[8px] border border-[var(--border-1)] bg-[var(--surface-2)]'>
{/* Selection ring overlay (takes priority over execution rings) */}
{isPreviewSelected && (
<div className='pointer-events-none absolute inset-0 z-40 rounded-[8px] ring-[1.75px] ring-[var(--brand-secondary)]' />
@@ -82,7 +401,7 @@ function WorkflowPreviewBlockInner({ data }: NodeProps<WorkflowPreviewBlockData>
)}
{/* Target handle - not shown for triggers/starters */}
{!isStarterOrTrigger && (
{shouldShowDefaultHandles && (
<Handle
type='target'
position={horizontalHandles ? Position.Left : Position.Top}
@@ -96,49 +415,67 @@ function WorkflowPreviewBlockInner({ data }: NodeProps<WorkflowPreviewBlockData>
/>
)}
{/* Header */}
{/* Header - matches WorkflowBlock structure */}
<div
className={`flex items-center gap-[10px] p-[8px] ${hasSubBlocks || showErrorRow ? 'border-[var(--divider)] border-b' : ''}`}
className={`flex items-center justify-between p-[8px] ${hasContentBelowHeader ? 'border-[var(--border-1)] border-b' : ''}`}
>
<div
className='flex h-[24px] w-[24px] flex-shrink-0 items-center justify-center rounded-[6px]'
style={{ background: enabled ? blockConfig.bgColor : 'gray' }}
>
<IconComponent className='h-[16px] w-[16px] text-white' />
<div className='relative z-10 flex min-w-0 flex-1 items-center gap-[10px]'>
<div
className='flex h-[24px] w-[24px] flex-shrink-0 items-center justify-center rounded-[6px]'
style={{ background: enabled ? blockConfig.bgColor : 'gray' }}
>
<IconComponent className='h-[16px] w-[16px] text-white' />
</div>
<span
className={`truncate font-medium text-[16px] ${!enabled ? 'text-[var(--text-muted)]' : ''}`}
title={name}
>
{name}
</span>
</div>
<span
className={`truncate font-medium text-[16px] ${!enabled ? 'text-[#808080]' : ''}`}
title={name}
>
{name}
</span>
</div>
{/* Subblocks skeleton */}
{(hasSubBlocks || showErrorRow) && (
{/* Content area with subblocks */}
{hasContentBelowHeader && (
<div className='flex flex-col gap-[8px] p-[8px]'>
{visibleSubBlocks.slice(0, 4).map((subBlock) => (
<div key={subBlock.id} className='flex items-center gap-[8px]'>
<span className='min-w-0 truncate text-[14px] text-[var(--text-tertiary)] capitalize'>
{subBlock.title ?? subBlock.id}
</span>
<span className='flex-1 truncate text-right text-[14px] text-[var(--white)]'>-</span>
</div>
))}
{visibleSubBlocks.length > 4 && (
<div className='flex items-center gap-[8px]'>
<span className='text-[14px] text-[var(--text-tertiary)]'>
+{visibleSubBlocks.length - 4} more
</span>
</div>
)}
{showErrorRow && (
<div className='flex items-center gap-[8px]'>
<span className='min-w-0 truncate text-[14px] text-[var(--text-tertiary)] capitalize'>
error
</span>
</div>
{type === 'condition' ? (
// Condition block: render condition rows
conditionRows.map((cond) => (
<SubBlockRow key={cond.id} title={cond.title} value={getDisplayValue(cond.value)} />
))
) : type === 'router_v2' ? (
// Router block: render context + route rows
<>
<SubBlockRow
key='context'
title='Context'
value={getDisplayValue(rawValues.context)}
/>
{routerRows.map((route, index) => (
<SubBlockRow
key={route.id}
title={`Route ${index + 1}`}
value={getDisplayValue(route.value)}
/>
))}
</>
) : (
// Standard blocks: render visible subblocks
visibleSubBlocks.map((subBlock) => {
const rawValue = rawValues[subBlock.id]
return (
<SubBlockRow
key={subBlock.id}
title={subBlock.title ?? subBlock.id}
value={getDisplayValue(rawValue)}
subBlock={subBlock}
rawValue={rawValue}
/>
)
})
)}
{/* Error row for non-trigger blocks */}
{shouldShowDefaultHandles && <SubBlockRow title='error' />}
</div>
)}
@@ -162,16 +499,47 @@ function shouldSkipPreviewBlockRender(
prevProps: NodeProps<WorkflowPreviewBlockData>,
nextProps: NodeProps<WorkflowPreviewBlockData>
): boolean {
return (
prevProps.id === nextProps.id &&
prevProps.data.type === nextProps.data.type &&
prevProps.data.name === nextProps.data.name &&
prevProps.data.isTrigger === nextProps.data.isTrigger &&
prevProps.data.horizontalHandles === nextProps.data.horizontalHandles &&
prevProps.data.enabled === nextProps.data.enabled &&
prevProps.data.isPreviewSelected === nextProps.data.isPreviewSelected &&
prevProps.data.executionStatus === nextProps.data.executionStatus
)
// Check primitive props first (fast path)
if (
prevProps.id !== nextProps.id ||
prevProps.data.type !== nextProps.data.type ||
prevProps.data.name !== nextProps.data.name ||
prevProps.data.isTrigger !== nextProps.data.isTrigger ||
prevProps.data.horizontalHandles !== nextProps.data.horizontalHandles ||
prevProps.data.enabled !== nextProps.data.enabled ||
prevProps.data.isPreviewSelected !== nextProps.data.isPreviewSelected ||
prevProps.data.executionStatus !== nextProps.data.executionStatus
) {
return false
}
// Compare subBlockValues by reference first
const prevValues = prevProps.data.subBlockValues
const nextValues = nextProps.data.subBlockValues
if (prevValues === nextValues) {
return true
}
if (!prevValues || !nextValues) {
return false
}
// Shallow compare keys and values
const prevKeys = Object.keys(prevValues)
const nextKeys = Object.keys(nextValues)
if (prevKeys.length !== nextKeys.length) {
return false
}
for (const key of prevKeys) {
if (prevValues[key] !== nextValues[key]) {
return false
}
}
return true
}
export const WorkflowPreviewBlock = memo(WorkflowPreviewBlockInner, shouldSkipPreviewBlockRender)

View File

@@ -347,6 +347,7 @@ export function WorkflowPreview({
enabled: block.enabled ?? true,
isPreviewSelected: isSelected,
executionStatus,
subBlockValues: block.subBlocks,
},
})
})

View File

@@ -21,7 +21,7 @@ import {
} from '@/lib/workflows/schedules/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('TriggerScheduleExecution')
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
} catch (error: unknown) {
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({

View File

@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { safeAssign } from '@/tools/safe-assign'
import { getTrigger, isTriggerValid } from '@/triggers'
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
deploymentVersionId,
})
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult || {
success: false,
output: {},
logs: [],
}
const executionResult = hasExecutionResult(error)
? error.executionResult
: {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({

View File

@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import type { CoreTriggerType } from '@/stores/logs/filters/types'
const logger = createLogger('TriggerWorkflowExecution')
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executionId,
})
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({

View File

@@ -0,0 +1,31 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { ExecutionResult } from '@/executor/types'
interface ChildWorkflowErrorOptions {
message: string
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
cause?: Error
}
/**
* Error raised when a child workflow execution fails.
*/
export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
this.name = 'ChildWorkflowError'
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
return error instanceof ChildWorkflowError
}
}

View File

@@ -13,6 +13,7 @@ import {
isSentinelBlockType,
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
generatePauseContextId,
@@ -213,24 +214,26 @@ export class BlockExecutor {
? resolvedInputs
: ((block.config?.params as Record<string, any> | undefined) ?? {})
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
}
this.state.setBlockOutput(node.id, errorOutput, duration)
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
blockLog.output = this.filterOutputForLog(block, errorOutput)
}
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{

View File

@@ -13,7 +13,7 @@ import type {
PausePoint,
ResumeStatus,
} from '@/executor/types'
import { normalizeError } from '@/executor/utils/errors'
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
const logger = createLogger('ExecutionEngine')
@@ -170,8 +170,8 @@ export class ExecutionEngine {
metadata: this.context.metadata,
}
if (error && typeof error === 'object') {
;(error as any).executionResult = executionResult
if (error instanceof Error) {
attachExecutionResult(error, executionResult)
}
throw error
}

View File

@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type {
BlockHandler,
ExecutionContext,
ExecutionResult,
StreamingExecution,
} from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
import { parseJSON } from '@/executor/utils/json'
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
)
return mappedResult
} catch (error: any) {
} catch (error: unknown) {
logger.error(`Error executing child workflow ${workflowId}:`, error)
const { workflows } = useWorkflowRegistry.getState()
const workflowMetadata = workflows[workflowId]
const childWorkflowName = workflowMetadata?.name || workflowId
const originalError = error.message || 'Unknown error'
const wrappedError = new Error(
`Error in child workflow "${childWorkflowName}": ${originalError}`
)
const originalError = error instanceof Error ? error.message : 'Unknown error'
let childTraceSpans: WorkflowTraceSpan[] = []
let executionResult: ExecutionResult | undefined
if (error.executionResult?.logs) {
const executionResult = error.executionResult as ExecutionResult
if (hasExecutionResult(error) && error.executionResult.logs) {
executionResult = error.executionResult
logger.info(`Extracting child trace spans from error.executionResult`, {
hasLogs: (executionResult.logs?.length ?? 0) > 0,
logCount: executionResult.logs?.length ?? 0,
})
const childTraceSpans = this.captureChildWorkflowLogs(
executionResult,
childWorkflowName,
ctx
)
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
;(wrappedError as any).childTraceSpans = childTraceSpans
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
;(wrappedError as any).childTraceSpans = error.childTraceSpans
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
childTraceSpans = error.childTraceSpans
}
throw wrappedError
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
childWorkflowName,
childTraceSpans,
executionResult,
cause: error instanceof Error ? error : undefined,
})
}
}
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
if (!success) {
logger.warn(`Child workflow ${childWorkflowName} failed`)
const error = new Error(
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
)
;(error as any).childTraceSpans = childTraceSpans || []
throw error
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
childWorkflowName,
childTraceSpans: childTraceSpans || [],
})
}
return {

View File

@@ -1,6 +1,39 @@
import type { ExecutionContext } from '@/executor/types'
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'
/**
* Interface for errors that carry an ExecutionResult.
* Used when workflow execution fails and we want to preserve partial results.
*/
export interface ErrorWithExecutionResult extends Error {
executionResult: ExecutionResult
}
/**
* Type guard to check if an error carries an ExecutionResult.
* Validates that executionResult has required fields (success, output).
*/
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
if (
!(error instanceof Error) ||
!('executionResult' in error) ||
error.executionResult == null ||
typeof error.executionResult !== 'object'
) {
return false
}
const result = error.executionResult as Record<string, unknown>
return typeof result.success === 'boolean' && result.output != null
}
/**
* Attaches an ExecutionResult to an error for propagation to parent workflows.
*/
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
Object.assign(error, { executionResult })
}
export interface BlockExecutionErrorDetails {
block: SerializedBlock
error: Error | string

View File

@@ -237,6 +237,7 @@ interface DeployWorkflowResult {
isDeployed: boolean
deployedAt?: string
apiKey?: string
warnings?: string[]
}
/**
@@ -272,6 +273,7 @@ export function useDeployWorkflow() {
isDeployed: data.isDeployed ?? false,
deployedAt: data.deployedAt,
apiKey: data.apiKey,
warnings: data.warnings,
}
},
onSuccess: (data, variables) => {
@@ -360,6 +362,7 @@ interface ActivateVersionVariables {
interface ActivateVersionResult {
deployedAt?: string
apiKey?: string
warnings?: string[]
}
/**

View File

@@ -100,8 +100,13 @@ export function useExecutionStream() {
})
if (!response.ok) {
const error = await response.json()
throw new Error(error.error || 'Failed to start execution')
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
// Attach the execution result from server response for error handling
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {

View File

@@ -1,4 +1,4 @@
import { useCallback, useEffect, useMemo, useState } from 'react'
import { useCallback, useEffect, useMemo } from 'react'
import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation'
import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -6,12 +6,10 @@ import { getBlock } from '@/blocks'
import { populateTriggerFieldsFromConfig } from '@/hooks/use-trigger-config-aggregation'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import { getTrigger, isTriggerValid } from '@/triggers'
import { isTriggerValid } from '@/triggers'
const logger = createLogger('useWebhookManagement')
const CREDENTIAL_SET_PREFIX = 'credentialSet:'
interface UseWebhookManagementProps {
blockId: string
triggerId?: string
@@ -24,9 +22,6 @@ interface WebhookManagementState {
webhookPath: string
webhookId: string | null
isLoading: boolean
isSaving: boolean
saveConfig: () => Promise<boolean>
deleteConfig: () => Promise<boolean>
}
/**
@@ -83,11 +78,9 @@ function resolveEffectiveTriggerId(
}
/**
* Hook to manage webhook lifecycle for trigger blocks
* Handles:
* - Pre-generating webhook URLs based on blockId (without creating webhook)
* - Loading existing webhooks from the API
* - Saving and deleting webhook configurations
* Hook to load webhook info for trigger blocks.
* Used for displaying webhook URLs in the UI.
* Webhook creation/updates are handled by the deploy flow.
*/
export function useWebhookManagement({
blockId,
@@ -98,8 +91,6 @@ export function useWebhookManagement({
const params = useParams()
const workflowId = params.workflowId as string
const triggerDef = triggerId && isTriggerValid(triggerId) ? getTrigger(triggerId) : null
const webhookId = useSubBlockStore(
useCallback((state) => state.getValue(blockId, 'webhookId') as string | null, [blockId])
)
@@ -107,7 +98,6 @@ export function useWebhookManagement({
useCallback((state) => state.getValue(blockId, 'triggerPath') as string | null, [blockId])
)
const isLoading = useSubBlockStore((state) => state.loadingWebhooks.has(blockId))
const isChecked = useSubBlockStore((state) => state.checkedWebhooks.has(blockId))
const webhookUrl = useMemo(() => {
if (!webhookPath) {
@@ -118,8 +108,6 @@ export function useWebhookManagement({
return `${baseUrl}/api/webhooks/trigger/${webhookPath}`
}, [webhookPath, blockId])
const [isSaving, setIsSaving] = useState(false)
useEffect(() => {
if (triggerId && !isPreview) {
const storedTriggerId = useSubBlockStore.getState().getValue(blockId, 'triggerId')
@@ -143,7 +131,7 @@ export function useWebhookManagement({
return
}
const loadWebhookOrGenerateUrl = async () => {
const loadWebhookInfo = async () => {
useSubBlockStore.setState((state) => ({
loadingWebhooks: new Set([...state.loadingWebhooks, blockId]),
}))
@@ -171,8 +159,6 @@ export function useWebhookManagement({
if (webhook.providerConfig) {
const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId, webhook)
// Filter out runtime/system fields from providerConfig before storing as triggerConfig
// These fields are managed by the system and should not be included in change detection
const {
credentialId: _credId,
credentialSetId: _credSetId,
@@ -224,202 +210,14 @@ export function useWebhookManagement({
}
}
if (useWebhookUrl) {
loadWebhookOrGenerateUrl()
loadWebhookInfo()
}
}, [isPreview, triggerId, workflowId, blockId, useWebhookUrl])
const createWebhook = async (
effectiveTriggerId: string | undefined,
selectedCredentialId: string | null
): Promise<boolean> => {
if (!triggerDef || !effectiveTriggerId) {
return false
}
const triggerConfig = useSubBlockStore.getState().getValue(blockId, 'triggerConfig')
const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX)
const credentialSetId = isCredentialSet
? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length)
: undefined
const credentialId = isCredentialSet ? undefined : selectedCredentialId
const webhookConfig = {
...(triggerConfig || {}),
...(credentialId ? { credentialId } : {}),
...(credentialSetId ? { credentialSetId } : {}),
triggerId: effectiveTriggerId,
}
const path = blockId
const response = await fetch('/api/webhooks', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
workflowId,
blockId,
path,
provider: triggerDef.provider,
providerConfig: webhookConfig,
}),
})
if (!response.ok) {
let errorMessage = 'Failed to create webhook'
try {
const errorData = await response.json()
errorMessage = errorData.details || errorData.error || errorMessage
} catch {
// If response is not JSON, use default message
}
logger.error('Failed to create webhook', { errorMessage })
throw new Error(errorMessage)
}
const data = await response.json()
const savedWebhookId = data.webhook.id
useSubBlockStore.getState().setValue(blockId, 'triggerPath', path)
useSubBlockStore.getState().setValue(blockId, 'triggerId', effectiveTriggerId)
useSubBlockStore.getState().setValue(blockId, 'webhookId', savedWebhookId)
useSubBlockStore.setState((state) => ({
checkedWebhooks: new Set([...state.checkedWebhooks, blockId]),
}))
logger.info('Trigger webhook created successfully', {
webhookId: savedWebhookId,
triggerId: effectiveTriggerId,
provider: triggerDef.provider,
blockId,
})
return true
}
const updateWebhook = async (
webhookIdToUpdate: string,
effectiveTriggerId: string | undefined,
selectedCredentialId: string | null
): Promise<boolean> => {
const triggerConfigRaw = useSubBlockStore.getState().getValue(blockId, 'triggerConfig')
const triggerConfig =
typeof triggerConfigRaw === 'object' && triggerConfigRaw !== null
? (triggerConfigRaw as Record<string, unknown>)
: {}
const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX)
const credentialSetId = isCredentialSet
? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length)
: undefined
const credentialId = isCredentialSet ? undefined : selectedCredentialId
const response = await fetch(`/api/webhooks/${webhookIdToUpdate}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
providerConfig: {
...triggerConfig,
...(credentialId ? { credentialId } : {}),
...(credentialSetId ? { credentialSetId } : {}),
triggerId: effectiveTriggerId,
},
}),
})
if (response.status === 404) {
logger.warn('Webhook not found while updating, recreating', {
blockId,
lostWebhookId: webhookIdToUpdate,
})
useSubBlockStore.getState().setValue(blockId, 'webhookId', null)
return createWebhook(effectiveTriggerId, selectedCredentialId)
}
if (!response.ok) {
let errorMessage = 'Failed to save trigger configuration'
try {
const errorData = await response.json()
errorMessage = errorData.details || errorData.error || errorMessage
} catch {
// If response is not JSON, use default message
}
logger.error('Failed to save trigger config', { errorMessage })
throw new Error(errorMessage)
}
logger.info('Trigger config saved successfully', { blockId, webhookId: webhookIdToUpdate })
return true
}
const saveConfig = async (): Promise<boolean> => {
if (isPreview || !triggerDef) {
return false
}
const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId)
try {
setIsSaving(true)
const triggerCredentials = useSubBlockStore.getState().getValue(blockId, 'triggerCredentials')
const selectedCredentialId = (triggerCredentials as string | null) || null
if (!webhookId) {
return createWebhook(effectiveTriggerId, selectedCredentialId)
}
return updateWebhook(webhookId, effectiveTriggerId, selectedCredentialId)
} catch (error) {
logger.error('Error saving trigger config:', error)
throw error
} finally {
setIsSaving(false)
}
}
const deleteConfig = async (): Promise<boolean> => {
if (isPreview || !webhookId) {
return false
}
try {
setIsSaving(true)
const response = await fetch(`/api/webhooks/${webhookId}`, {
method: 'DELETE',
})
if (!response.ok) {
logger.error('Failed to delete webhook')
return false
}
useSubBlockStore.getState().setValue(blockId, 'triggerPath', '')
useSubBlockStore.getState().setValue(blockId, 'webhookId', null)
useSubBlockStore.setState((state) => {
const newSet = new Set(state.checkedWebhooks)
newSet.delete(blockId)
return { checkedWebhooks: newSet }
})
logger.info('Webhook deleted successfully')
return true
} catch (error) {
logger.error('Error deleting webhook:', error)
return false
} finally {
setIsSaving(false)
}
}
return {
webhookUrl,
webhookPath: webhookPath || blockId,
webhookId,
isLoading,
isSaving,
saveConfig,
deleteConfig,
}
}

View File

@@ -32,6 +32,12 @@ interface TriggerSaveError {
interface TriggerSaveResult {
success: boolean
error?: TriggerSaveError
warnings?: string[]
}
interface CredentialSetSyncResult {
error: TriggerSaveError | null
warnings: string[]
}
interface SaveTriggerWebhooksInput {
@@ -42,6 +48,16 @@ interface SaveTriggerWebhooksInput {
blocks: Record<string, BlockState>
requestId: string
deploymentVersionId?: string
/**
* The previous active version's ID. Only this version's external subscriptions
* will be cleaned up (along with draft webhooks). If not provided, skips cleanup.
*/
previousVersionId?: string
/**
* When true, forces recreation of external subscriptions even if webhook config is unchanged.
* Used when activating a previous deployment version whose subscriptions were cleaned up.
*/
forceRecreateSubscriptions?: boolean
}
function getSubBlockValue(block: BlockState, subBlockId: string): unknown {
@@ -248,7 +264,7 @@ async function syncCredentialSetWebhooks(params: {
providerConfig: Record<string, unknown>
requestId: string
deploymentVersionId?: string
}): Promise<TriggerSaveError | null> {
}): Promise<CredentialSetSyncResult> {
const {
workflowId,
blockId,
@@ -261,7 +277,7 @@ async function syncCredentialSetWebhooks(params: {
const credentialSetId = providerConfig.credentialSetId as string | undefined
if (!credentialSetId) {
return null
return { error: null, warnings: [] }
}
const oauthProviderId = getProviderIdFromServiceId(provider)
@@ -280,10 +296,23 @@ async function syncCredentialSetWebhooks(params: {
deploymentVersionId,
})
const warnings: string[] = []
if (syncResult.failed.length > 0) {
const failedCount = syncResult.failed.length
const totalCount = syncResult.webhooks.length + failedCount
warnings.push(
`${failedCount} of ${totalCount} credentials in the set failed to sync for ${provider}. Some team members may not receive triggers.`
)
}
if (syncResult.webhooks.length === 0) {
return {
message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`,
status: 400,
error: {
message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`,
status: 400,
},
warnings,
}
}
@@ -297,8 +326,11 @@ async function syncCredentialSetWebhooks(params: {
if (!success) {
await db.delete(webhook).where(eq(webhook.id, wh.id))
return {
message: `Failed to configure ${provider} polling. Please check account permissions.`,
status: 500,
error: {
message: `Failed to configure ${provider} polling. Please check account permissions.`,
status: 500,
},
warnings,
}
}
}
@@ -306,84 +338,7 @@ async function syncCredentialSetWebhooks(params: {
}
}
return null
}
async function createWebhookForBlock(params: {
request: NextRequest
workflowId: string
workflow: Record<string, unknown>
userId: string
block: BlockState
provider: string
providerConfig: Record<string, unknown>
triggerPath: string
requestId: string
deploymentVersionId?: string
}): Promise<TriggerSaveError | null> {
const {
request,
workflowId,
workflow,
userId,
block,
provider,
providerConfig,
triggerPath,
requestId,
deploymentVersionId,
} = params
const webhookId = nanoid()
const createPayload = {
id: webhookId,
path: triggerPath,
provider,
providerConfig,
}
const result = await createExternalWebhookSubscription(
request,
createPayload,
workflow,
userId,
requestId
)
const updatedProviderConfig = result.updatedProviderConfig as Record<string, unknown>
let savedWebhook: any
try {
const createdRows = await db
.insert(webhook)
.values({
id: webhookId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId: block.id,
path: triggerPath,
provider,
providerConfig: updatedProviderConfig,
credentialSetId: (updatedProviderConfig.credentialSetId as string | undefined) || null,
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
})
.returning()
savedWebhook = createdRows[0]
} catch (error) {
if (result.externalSubscriptionCreated) {
await cleanupExternalWebhook(createPayload, workflow, requestId)
}
throw error
}
const pollingError = await configurePollingIfNeeded(provider, savedWebhook, requestId)
if (pollingError) {
return pollingError
}
return null
return { error: null, warnings }
}
/**
@@ -398,23 +353,57 @@ export async function saveTriggerWebhooksForDeploy({
blocks,
requestId,
deploymentVersionId,
previousVersionId,
forceRecreateSubscriptions = false,
}: SaveTriggerWebhooksInput): Promise<TriggerSaveResult> {
const triggerBlocks = Object.values(blocks || {}).filter(Boolean)
const triggerBlocks = Object.values(blocks || {}).filter((b) => b && b.enabled !== false)
const currentBlockIds = new Set(triggerBlocks.map((b) => b.id))
// 1. Get all existing webhooks for this workflow
const existingWebhooks = await db
// 1. Get ALL webhooks for this workflow (all versions including draft)
const allWorkflowWebhooks = await db
.select()
.from(webhook)
.where(
deploymentVersionId
? and(
eq(webhook.workflowId, workflowId),
eq(webhook.deploymentVersionId, deploymentVersionId)
)
: eq(webhook.workflowId, workflowId)
.where(eq(webhook.workflowId, workflowId))
// Separate webhooks by version: current deployment vs others
const existingWebhooks: typeof allWorkflowWebhooks = []
for (const wh of allWorkflowWebhooks) {
if (deploymentVersionId && wh.deploymentVersionId === deploymentVersionId) {
existingWebhooks.push(wh)
}
}
if (previousVersionId) {
const webhooksToCleanup = allWorkflowWebhooks.filter(
(wh) => wh.deploymentVersionId === previousVersionId
)
if (webhooksToCleanup.length > 0) {
logger.info(
`[${requestId}] Cleaning up ${webhooksToCleanup.length} external subscription(s) from previous version`
)
for (const wh of webhooksToCleanup) {
try {
await cleanupExternalWebhook(wh, workflow, requestId)
} catch (cleanupError) {
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
}
}
}
}
const restorePreviousSubscriptions = async () => {
if (!previousVersionId) return
await restorePreviousVersionWebhooks({
request,
workflow,
userId,
previousVersionId,
requestId,
})
}
const webhooksByBlockId = new Map<string, typeof existingWebhooks>()
for (const wh of existingWebhooks) {
if (!wh.blockId) continue
@@ -429,7 +418,14 @@ export async function saveTriggerWebhooksForDeploy({
existingWebhookBlockIds: Array.from(webhooksByBlockId.keys()),
})
// 2. Determine which webhooks to delete (orphaned or config changed)
type WebhookConfig = {
provider: string
providerConfig: Record<string, unknown>
triggerPath: string
triggerDef: ReturnType<typeof getTrigger>
}
const webhookConfigs = new Map<string, WebhookConfig>()
const webhooksToDelete: typeof existingWebhooks = []
const blocksNeedingWebhook: BlockState[] = []
const blocksNeedingCredentialSetSync: BlockState[] = []
@@ -447,6 +443,7 @@ export async function saveTriggerWebhooksForDeploy({
)
if (missingFields.length > 0) {
await restorePreviousSubscriptions()
return {
success: false,
error: {
@@ -455,9 +452,8 @@ export async function saveTriggerWebhooksForDeploy({
},
}
}
// Store config for later use
;(block as any)._webhookConfig = { provider, providerConfig, triggerPath, triggerDef }
webhookConfigs.set(block.id, { provider, providerConfig, triggerPath, triggerDef })
if (providerConfig.credentialSetId) {
blocksNeedingCredentialSetSync.push(block)
@@ -477,22 +473,29 @@ export async function saveTriggerWebhooksForDeploy({
)
}
// Check if config changed
// Check if config changed or if we're forcing recreation (e.g., activating old version)
const existingConfig = (existingWh.providerConfig as Record<string, unknown>) || {}
if (
const needsRecreation =
forceRecreateSubscriptions ||
shouldRecreateExternalWebhookSubscription({
previousProvider: existingWh.provider as string,
nextProvider: provider,
previousConfig: existingConfig,
nextConfig: providerConfig,
})
) {
// Config changed - delete and recreate
if (needsRecreation) {
webhooksToDelete.push(existingWh)
blocksNeedingWebhook.push(block)
logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`)
if (forceRecreateSubscriptions) {
logger.info(
`[${requestId}] Forcing webhook recreation for block ${block.id} (reactivating version)`
)
} else {
logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`)
}
}
// else: config unchanged, keep existing webhook
// else: config unchanged and not forcing recreation, keep existing webhook
}
}
@@ -522,15 +525,16 @@ export async function saveTriggerWebhooksForDeploy({
await db.delete(webhook).where(inArray(webhook.id, idsToDelete))
}
// 4. Sync credential set webhooks
const collectedWarnings: string[] = []
for (const block of blocksNeedingCredentialSetSync) {
const config = (block as any)._webhookConfig
const config = webhookConfigs.get(block.id)
if (!config) continue
const { provider, providerConfig, triggerPath } = config
try {
const credentialSetError = await syncCredentialSetWebhooks({
const syncResult = await syncCredentialSetWebhooks({
workflowId,
blockId: block.id,
provider,
@@ -540,74 +544,214 @@ export async function saveTriggerWebhooksForDeploy({
deploymentVersionId,
})
if (credentialSetError) {
return { success: false, error: credentialSetError }
if (syncResult.warnings.length > 0) {
collectedWarnings.push(...syncResult.warnings)
}
if (syncResult.error) {
await restorePreviousSubscriptions()
return { success: false, error: syncResult.error, warnings: collectedWarnings }
}
} catch (error: any) {
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: error?.message || 'Failed to save trigger configuration',
status: 500,
},
warnings: collectedWarnings,
}
}
}
// 5. Create webhooks for blocks that need them
// 5. Create webhooks for blocks that need them (two-phase approach for atomicity)
const createdSubscriptions: Array<{
webhookId: string
block: BlockState
provider: string
triggerPath: string
updatedProviderConfig: Record<string, unknown>
externalSubscriptionCreated: boolean
}> = []
for (const block of blocksNeedingWebhook) {
const config = (block as any)._webhookConfig
const config = webhookConfigs.get(block.id)
if (!config) continue
const { provider, providerConfig, triggerPath } = config
const webhookId = nanoid()
const createPayload = {
id: webhookId,
path: triggerPath,
provider,
providerConfig,
}
try {
const createError = await createWebhookForBlock({
const result = await createExternalWebhookSubscription(
request,
workflowId,
createPayload,
workflow,
userId,
requestId
)
createdSubscriptions.push({
webhookId,
block,
provider,
providerConfig,
triggerPath,
requestId,
deploymentVersionId,
updatedProviderConfig: result.updatedProviderConfig as Record<string, unknown>,
externalSubscriptionCreated: result.externalSubscriptionCreated,
})
if (createError) {
return { success: false, error: createError }
}
} catch (error: any) {
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
logger.error(`[${requestId}] Failed to create external subscription for ${block.id}`, error)
for (const sub of createdSubscriptions) {
if (sub.externalSubscriptionCreated) {
try {
await cleanupExternalWebhook(
{
id: sub.webhookId,
path: sub.triggerPath,
provider: sub.provider,
providerConfig: sub.updatedProviderConfig,
},
workflow,
requestId
)
} catch (cleanupError) {
logger.warn(
`[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`,
cleanupError
)
}
}
}
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: error?.message || 'Failed to save trigger configuration',
message: error?.message || 'Failed to create external subscription',
status: 500,
},
}
}
}
// Clean up temp config
for (const block of triggerBlocks) {
;(block as any)._webhookConfig = undefined
// Phase 2: Insert all DB records in a transaction
try {
await db.transaction(async (tx) => {
for (const sub of createdSubscriptions) {
await tx.insert(webhook).values({
id: sub.webhookId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId: sub.block.id,
path: sub.triggerPath,
provider: sub.provider,
providerConfig: sub.updatedProviderConfig,
credentialSetId:
(sub.updatedProviderConfig.credentialSetId as string | undefined) || null,
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
})
}
})
for (const sub of createdSubscriptions) {
const pollingError = await configurePollingIfNeeded(
sub.provider,
{ id: sub.webhookId, path: sub.triggerPath, providerConfig: sub.updatedProviderConfig },
requestId
)
if (pollingError) {
logger.error(
`[${requestId}] Polling configuration failed for ${sub.block.id}`,
pollingError
)
for (const otherSub of createdSubscriptions) {
if (otherSub.webhookId === sub.webhookId) continue
if (otherSub.externalSubscriptionCreated) {
try {
await cleanupExternalWebhook(
{
id: otherSub.webhookId,
path: otherSub.triggerPath,
provider: otherSub.provider,
providerConfig: otherSub.updatedProviderConfig,
},
workflow,
requestId
)
} catch (cleanupError) {
logger.warn(
`[${requestId}] Failed to cleanup external subscription for ${otherSub.block.id}`,
cleanupError
)
}
}
}
const otherWebhookIds = createdSubscriptions
.filter((s) => s.webhookId !== sub.webhookId)
.map((s) => s.webhookId)
if (otherWebhookIds.length > 0) {
await db.delete(webhook).where(inArray(webhook.id, otherWebhookIds))
}
await restorePreviousSubscriptions()
return { success: false, error: pollingError }
}
}
} catch (error: any) {
logger.error(`[${requestId}] Failed to insert webhook records`, error)
for (const sub of createdSubscriptions) {
if (sub.externalSubscriptionCreated) {
try {
await cleanupExternalWebhook(
{
id: sub.webhookId,
path: sub.triggerPath,
provider: sub.provider,
providerConfig: sub.updatedProviderConfig,
},
workflow,
requestId
)
} catch (cleanupError) {
logger.warn(
`[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`,
cleanupError
)
}
}
}
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: error?.message || 'Failed to save webhook records',
status: 500,
},
}
}
return { success: true }
return { success: true, warnings: collectedWarnings.length > 0 ? collectedWarnings : undefined }
}
/**
* Clean up all webhooks for a workflow during undeploy.
* Removes external subscriptions and deletes webhook records from the database.
*
* @param skipExternalCleanup - If true, skip external subscription cleanup (already done elsewhere)
*/
export async function cleanupWebhooksForWorkflow(
workflowId: string,
workflow: Record<string, unknown>,
requestId: string,
deploymentVersionId?: string
deploymentVersionId?: string,
skipExternalCleanup = false
): Promise<void> {
const existingWebhooks = await db
.select()
@@ -626,23 +770,26 @@ export async function cleanupWebhooksForWorkflow(
return
}
logger.info(`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for undeploy`, {
workflowId,
deploymentVersionId,
webhookIds: existingWebhooks.map((wh) => wh.id),
})
logger.info(
`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for ${skipExternalCleanup ? 'DB records only' : 'undeploy'}`,
{
workflowId,
deploymentVersionId,
webhookIds: existingWebhooks.map((wh) => wh.id),
}
)
// Clean up external subscriptions
for (const wh of existingWebhooks) {
try {
await cleanupExternalWebhook(wh, workflow, requestId)
} catch (cleanupError) {
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
// Continue with other webhooks even if one fails
if (!skipExternalCleanup) {
for (const wh of existingWebhooks) {
try {
await cleanupExternalWebhook(wh, workflow, requestId)
} catch (cleanupError) {
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
// Continue with other webhooks even if one fails
}
}
}
// Delete all webhook records
await db
.delete(webhook)
.where(
@@ -660,3 +807,55 @@ export async function cleanupWebhooksForWorkflow(
: `[${requestId}] Cleaned up all webhooks for workflow ${workflowId}`
)
}
/**
* Restore external subscriptions for a previous deployment version.
* Used when activation/deployment fails after webhooks were created,
* to restore the previous version's external subscriptions.
*/
export async function restorePreviousVersionWebhooks(params: {
request: NextRequest
workflow: Record<string, unknown>
userId: string
previousVersionId: string
requestId: string
}): Promise<void> {
const { request, workflow, userId, previousVersionId, requestId } = params
const previousWebhooks = await db
.select()
.from(webhook)
.where(eq(webhook.deploymentVersionId, previousVersionId))
if (previousWebhooks.length === 0) {
logger.debug(`[${requestId}] No previous webhooks to restore for version ${previousVersionId}`)
return
}
logger.info(
`[${requestId}] Restoring ${previousWebhooks.length} external subscription(s) for previous version ${previousVersionId}`
)
for (const wh of previousWebhooks) {
try {
await createExternalWebhookSubscription(
request,
{
id: wh.id,
path: wh.path,
provider: wh.provider,
providerConfig: (wh.providerConfig as Record<string, unknown>) || {},
},
workflow,
userId,
requestId
)
logger.info(`[${requestId}] Restored external subscription for webhook ${wh.id}`)
} catch (restoreError) {
logger.error(
`[${requestId}] Failed to restore external subscription for webhook ${wh.id}`,
restoreError
)
}
}
}

View File

@@ -161,7 +161,7 @@ export async function pollGmailWebhooks() {
const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId
const credentialSetId: string | undefined = metadata?.credentialSetId
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credential info for webhook ${webhookId}`)
@@ -697,7 +697,6 @@ async function processEmails(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),
@@ -766,17 +765,21 @@ async function markEmailAsRead(accessToken: string, messageId: string) {
}
async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
try {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (error) {
logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error)
}
}

View File

@@ -645,7 +645,6 @@ async function processEmails(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),

View File

@@ -210,7 +210,7 @@ export async function pollOutlookWebhooks() {
const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId
const credentialSetId: string | undefined = metadata?.credentialSetId
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`)
@@ -607,7 +607,6 @@ async function processOutlookEmails(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),

View File

@@ -7,16 +7,27 @@ import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
import {
handleSlackChallenge,
handleWhatsAppVerification,
validateCirclebackSignature,
validateFirefliesSignature,
validateGitHubSignature,
validateJiraSignature,
validateLinearSignature,
validateMicrosoftTeamsSignature,
validateTwilioSignature,
validateTypeformSignature,
verifyProviderWebhook,
} from '@/lib/webhooks/utils.server'
import { executeWebhookJob } from '@/background/webhook-execution'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { isGitHubEventMatch } from '@/triggers/github/utils'
import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
import { isJiraEventMatch } from '@/triggers/jira/utils'
const logger = createLogger('WebhookProcessor')
@@ -451,7 +462,6 @@ export async function verifyProviderAuth(
// Step 1: Fetch and decrypt environment variables for signature verification
let decryptedEnvVars: Record<string, string> = {}
try {
const { getEffectiveDecryptedEnv } = await import('@/lib/environment/utils')
decryptedEnvVars = await getEffectiveDecryptedEnv(
foundWorkflow.userId,
foundWorkflow.workspaceId
@@ -553,9 +563,6 @@ export async function verifyProviderAuth(
}
const fullUrl = getExternalUrl(request)
const { validateTwilioSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = await validateTwilioSignature(authToken, signature, fullUrl, params)
if (!isValidSignature) {
@@ -583,8 +590,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Typeform signature', { status: 401 })
}
const { validateTypeformSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateTypeformSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -610,8 +615,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Linear signature', { status: 401 })
}
const { validateLinearSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateLinearSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -637,8 +640,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Circleback signature', { status: 401 })
}
const { validateCirclebackSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateCirclebackSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -664,8 +665,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Jira signature', { status: 401 })
}
const { validateJiraSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateJiraSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -694,8 +693,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing GitHub signature', { status: 401 })
}
const { validateGitHubSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateGitHubSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -724,8 +721,6 @@ export async function verifyProviderAuth(
return new NextResponse('Unauthorized - Missing Fireflies signature', { status: 401 })
}
const { validateFirefliesSignature } = await import('@/lib/webhooks/utils.server')
const isValidSignature = validateFirefliesSignature(secret, signature, rawBody)
if (!isValidSignature) {
@@ -860,8 +855,6 @@ export async function queueWebhookExecution(
const eventType = request.headers.get('x-github-event')
const action = body.action
const { isGitHubEventMatch } = await import('@/triggers/github/utils')
if (!isGitHubEventMatch(triggerId, eventType || '', action, body)) {
logger.debug(
`[${options.requestId}] GitHub event mismatch for trigger ${triggerId}. Event: ${eventType}, Action: ${action}. Skipping execution.`,
@@ -890,8 +883,6 @@ export async function queueWebhookExecution(
if (triggerId && triggerId !== 'jira_webhook') {
const webhookEvent = body.webhookEvent as string | undefined
const { isJiraEventMatch } = await import('@/triggers/jira/utils')
if (!isJiraEventMatch(triggerId, webhookEvent || '', body)) {
logger.debug(
`[${options.requestId}] Jira event mismatch for trigger ${triggerId}. Event: ${webhookEvent}. Skipping execution.`,
@@ -921,8 +912,6 @@ export async function queueWebhookExecution(
const subscriptionType = firstEvent?.subscriptionType as string | undefined
const { isHubSpotContactEventMatch } = await import('@/triggers/hubspot/utils')
if (!isHubSpotContactEventMatch(triggerId, subscriptionType || '')) {
logger.debug(
`[${options.requestId}] HubSpot event mismatch for trigger ${triggerId}. Event: ${subscriptionType}. Skipping execution.`,
@@ -974,7 +963,8 @@ export async function queueWebhookExecution(
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
const credentialId = providerConfig.credentialId as string | undefined
const credentialSetId = providerConfig.credentialSetId as string | undefined
// credentialSetId is a direct field on webhook table, not in providerConfig
const credentialSetId = foundWebhook.credentialSetId as string | undefined
// Verify billing for credential sets
if (credentialSetId) {

View File

@@ -30,11 +30,11 @@ export async function createTeamsSubscription(
webhook: any,
workflow: any,
requestId: string
): Promise<void> {
): Promise<string | undefined> {
const config = getProviderConfig(webhook)
if (config.triggerId !== 'microsoftteams_chat_subscription') {
return
return undefined
}
const credentialId = config.credentialId as string | undefined
@@ -77,7 +77,7 @@ export async function createTeamsSubscription(
teamsLogger.info(
`[${requestId}] Teams subscription ${existingSubscriptionId} already exists for webhook ${webhook.id}`
)
return
return existingSubscriptionId
}
} catch {
teamsLogger.debug(`[${requestId}] Existing subscription check failed, will create new one`)
@@ -140,6 +140,7 @@ export async function createTeamsSubscription(
teamsLogger.info(
`[${requestId}] Successfully created Teams subscription ${payload.id} for webhook ${webhook.id}`
)
return payload.id as string
} catch (error: any) {
if (
error instanceof Error &&
@@ -1600,9 +1601,11 @@ export async function createExternalWebhookSubscription(
externalSubscriptionCreated = true
}
} else if (provider === 'microsoft-teams') {
await createTeamsSubscription(request, webhookData, workflow, requestId)
externalSubscriptionCreated =
(providerConfig.triggerId as string | undefined) === 'microsoftteams_chat_subscription'
const subscriptionId = await createTeamsSubscription(request, webhookData, workflow, requestId)
if (subscriptionId) {
updatedProviderConfig = { ...updatedProviderConfig, externalSubscriptionId: subscriptionId }
externalSubscriptionCreated = true
}
} else if (provider === 'telegram') {
await createTelegramWebhook(request, webhookData, requestId)
externalSubscriptionCreated = true

View File

@@ -379,7 +379,6 @@ async function processRssItems(
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),

View File

@@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { type NextRequest, NextResponse } from 'next/server'
import {
type SecureFetchResponse,
@@ -9,7 +10,11 @@ import {
validateUrlWithDNS,
} from '@/lib/core/security/input-validation'
import type { DbOrTx } from '@/lib/db/types'
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { getProviderIdFromServiceId } from '@/lib/oauth'
import {
getCredentialsForCredentialSet,
refreshAccessTokenIfNeeded,
} from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookUtils')
@@ -1388,21 +1393,6 @@ export function verifyProviderWebhook(
case 'stripe':
break
case 'gmail':
if (providerConfig.secret) {
const secretHeader = request.headers.get('X-Webhook-Secret')
if (!secretHeader || secretHeader.length !== providerConfig.secret.length) {
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
let result = 0
for (let i = 0; i < secretHeader.length; i++) {
result |= secretHeader.charCodeAt(i) ^ providerConfig.secret.charCodeAt(i)
}
if (result !== 0) {
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
}
break
case 'telegram': {
// Check User-Agent to ensure it's not blocked by middleware
@@ -1946,6 +1936,10 @@ export interface CredentialSetWebhookSyncResult {
created: number
updated: number
deleted: number
failed: Array<{
credentialId: string
error: string
}>
}
/**
@@ -1997,9 +1991,6 @@ export async function syncWebhooksForCredentialSet(params: {
`[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}`
)
const { getCredentialsForCredentialSet } = await import('@/app/api/auth/oauth/utils')
const { nanoid } = await import('nanoid')
// Polling providers get unique paths per credential (for independent state)
// External webhook providers share the same path (external service sends to one URL)
const pollingProviders = ['gmail', 'outlook', 'rss', 'imap']
@@ -2011,7 +2002,7 @@ export async function syncWebhooksForCredentialSet(params: {
syncLogger.warn(
`[${requestId}] No credentials found in credential set ${credentialSetId} for provider ${oauthProviderId}`
)
return { webhooks: [], created: 0, updated: 0, deleted: 0 }
return { webhooks: [], created: 0, updated: 0, deleted: 0, failed: [] }
}
syncLogger.info(
@@ -2033,10 +2024,9 @@ export async function syncWebhooksForCredentialSet(params: {
)
// Filter to only webhooks belonging to this credential set
const credentialSetWebhooks = existingWebhooks.filter((wh) => {
const config = wh.providerConfig as Record<string, any>
return config?.credentialSetId === credentialSetId
})
const credentialSetWebhooks = existingWebhooks.filter(
(wh) => wh.credentialSetId === credentialSetId
)
syncLogger.info(
`[${requestId}] Found ${credentialSetWebhooks.length} existing webhooks for credential set`
@@ -2058,103 +2048,128 @@ export async function syncWebhooksForCredentialSet(params: {
created: 0,
updated: 0,
deleted: 0,
failed: [],
}
// Process each credential in the set
for (const cred of credentials) {
const existingWebhook = existingByCredentialId.get(cred.credentialId)
try {
const existingWebhook = existingByCredentialId.get(cred.credentialId)
if (existingWebhook) {
// Update existing webhook - preserve state fields
const existingConfig = existingWebhook.providerConfig as Record<string, any>
if (existingWebhook) {
// Update existing webhook - preserve state fields
const existingConfig = existingWebhook.providerConfig as Record<string, any>
const updatedConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
// Preserve state fields from existing config
historyId: existingConfig.historyId,
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
setupCompleted: existingConfig.setupCompleted,
externalId: existingConfig.externalId,
userId: cred.userId,
}
const updatedConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
// Preserve state fields from existing config
historyId: existingConfig?.historyId,
lastCheckedTimestamp: existingConfig?.lastCheckedTimestamp,
setupCompleted: existingConfig?.setupCompleted,
externalId: existingConfig?.externalId,
userId: cred.userId,
}
await dbCtx
.update(webhook)
.set({
...(deploymentVersionId ? { deploymentVersionId } : {}),
providerConfig: updatedConfig,
await dbCtx
.update(webhook)
.set({
...(deploymentVersionId ? { deploymentVersionId } : {}),
providerConfig: updatedConfig,
isActive: true,
updatedAt: new Date(),
})
.where(eq(webhook.id, existingWebhook.id))
result.webhooks.push({
id: existingWebhook.id,
credentialId: cred.credentialId,
isNew: false,
})
result.updated++
syncLogger.debug(
`[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}`
)
} else {
// Create new webhook for this credential
const webhookId = nanoid()
const webhookPath = useUniquePaths
? `${basePath}-${cred.credentialId.slice(0, 8)}`
: basePath
const newConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
userId: cred.userId,
}
await dbCtx.insert(webhook).values({
id: webhookId,
workflowId,
blockId,
path: webhookPath,
provider,
providerConfig: newConfig,
credentialSetId, // Indexed column for efficient credential set queries
isActive: true,
...(deploymentVersionId ? { deploymentVersionId } : {}),
createdAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, existingWebhook.id))
result.webhooks.push({
id: existingWebhook.id,
credentialId: cred.credentialId,
isNew: false,
})
result.updated++
result.webhooks.push({
id: webhookId,
credentialId: cred.credentialId,
isNew: true,
})
result.created++
syncLogger.debug(
`[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}`
)
} else {
// Create new webhook for this credential
const webhookId = nanoid()
const webhookPath = useUniquePaths ? `${basePath}-${cred.credentialId.slice(0, 8)}` : basePath
const newConfig = {
...providerConfig,
basePath, // Store basePath for reliable reconstruction during membership sync
credentialId: cred.credentialId,
credentialSetId: credentialSetId,
userId: cred.userId,
syncLogger.debug(
`[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}`
)
}
await dbCtx.insert(webhook).values({
id: webhookId,
workflowId,
blockId,
path: webhookPath,
provider,
providerConfig: newConfig,
credentialSetId, // Indexed column for efficient credential set queries
isActive: true,
...(deploymentVersionId ? { deploymentVersionId } : {}),
createdAt: new Date(),
updatedAt: new Date(),
})
result.webhooks.push({
id: webhookId,
credentialId: cred.credentialId,
isNew: true,
})
result.created++
syncLogger.debug(
`[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}`
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
syncLogger.error(
`[${requestId}] Failed to sync webhook for credential ${cred.credentialId}: ${errorMessage}`
)
result.failed.push({
credentialId: cred.credentialId,
error: errorMessage,
})
}
}
// Delete webhooks for credentials no longer in the set
for (const [credentialId, existingWebhook] of existingByCredentialId) {
if (!credentialIdsInSet.has(credentialId)) {
await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id))
result.deleted++
try {
await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id))
result.deleted++
syncLogger.debug(
`[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}`
)
syncLogger.debug(
`[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}`
)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
syncLogger.error(
`[${requestId}] Failed to delete webhook ${existingWebhook.id} for credential ${credentialId}: ${errorMessage}`
)
result.failed.push({
credentialId,
error: `Failed to delete: ${errorMessage}`,
})
}
}
}
syncLogger.info(
`[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted`
`[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted, ${result.failed.length} failed`
)
return result
@@ -2175,8 +2190,6 @@ export async function syncAllWebhooksForCredentialSet(
const syncLogger = createLogger('CredentialSetMembershipSync')
syncLogger.info(`[${requestId}] Syncing all webhooks for credential set ${credentialSetId}`)
const { getProviderIdFromServiceId } = await import('@/lib/oauth')
// Find all webhooks that use this credential set using the indexed column
const webhooksForSet = await dbCtx
.select({ webhook })

View File

@@ -1,5 +1,5 @@
/**
* Pure utility functions for TwiML processing
* Pure utility functions for webhook processing
* This file has NO server-side dependencies to ensure it can be safely imported in client-side code
*/
@@ -18,3 +18,19 @@ export function convertSquareBracketsToTwiML(twiml: string | undefined): string
// Replace [Tag] with <Tag> and [/Tag] with </Tag>
return twiml.replace(/\[(\/?[^\]]+)\]/g, '<$1>')
}
/**
* Merges fields from source into target, but only if they don't exist in the base config.
* Used to preserve system-managed fields while respecting user-provided values.
*/
export function mergeNonUserFields(
target: Record<string, unknown>,
source: Record<string, unknown>,
userProvided: Record<string, unknown>
): void {
for (const [key, value] of Object.entries(source)) {
if (!(key in userProvided)) {
target[key] = value
}
}
}

View File

@@ -24,6 +24,7 @@ import type {
IterationContext,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
} catch (error: unknown) {
logger.error(`[${requestId}] Execution failed:`, error)
const errorWithResult = error as {
executionResult?: ExecutionResult
message?: string
stack?: string
}
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: errorWithResult?.message || 'Execution failed',
stackTrace: errorWithResult?.stack,
message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})

View File

@@ -17,6 +17,9 @@ const {
mockValidateCronExpression,
mockGetScheduleTimeValues,
mockRandomUUID,
mockTransaction,
mockSelect,
mockFrom,
} = vi.hoisted(() => ({
mockInsert: vi.fn(),
mockDelete: vi.fn(),
@@ -28,20 +31,27 @@ const {
mockValidateCronExpression: vi.fn(),
mockGetScheduleTimeValues: vi.fn(),
mockRandomUUID: vi.fn(),
mockTransaction: vi.fn(),
mockSelect: vi.fn(),
mockFrom: vi.fn(),
}))
vi.mock('@sim/db', () => ({
db: {},
db: {
transaction: mockTransaction,
},
workflowSchedule: {
workflowId: 'workflow_id',
blockId: 'block_id',
deploymentVersionId: 'deployment_version_id',
id: 'id',
},
}))
vi.mock('drizzle-orm', () => ({
eq: vi.fn((...args) => ({ type: 'eq', args })),
and: vi.fn((...args) => ({ type: 'and', args })),
inArray: vi.fn((...args) => ({ type: 'inArray', args })),
sql: vi.fn((strings, ...values) => ({ type: 'sql', strings, values })),
}))
@@ -100,6 +110,20 @@ describe('Schedule Deploy Utilities', () => {
// Setup mock chain for delete
mockWhere.mockResolvedValue({})
mockDelete.mockReturnValue({ where: mockWhere })
// Setup mock chain for select
mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue([]) })
mockSelect.mockReturnValue({ from: mockFrom })
// Setup transaction mock to execute callback with mock tx
mockTransaction.mockImplementation(async (callback) => {
const mockTx = {
insert: mockInsert,
delete: mockDelete,
select: mockSelect,
}
return callback(mockTx)
})
})
afterEach(() => {
@@ -135,6 +159,19 @@ describe('Schedule Deploy Utilities', () => {
const result = findScheduleBlocks({})
expect(result).toHaveLength(0)
})
it('should exclude disabled schedule blocks', () => {
const blocks: Record<string, BlockState> = {
'block-1': { id: 'block-1', type: 'schedule', enabled: true, subBlocks: {} } as BlockState,
'block-2': { id: 'block-2', type: 'schedule', enabled: false, subBlocks: {} } as BlockState,
'block-3': { id: 'block-3', type: 'schedule', subBlocks: {} } as BlockState, // enabled undefined = enabled
}
const result = findScheduleBlocks(blocks)
expect(result).toHaveLength(2)
expect(result.map((b) => b.id)).toEqual(['block-1', 'block-3'])
})
})
describe('validateScheduleBlock', () => {
@@ -671,20 +708,24 @@ describe('Schedule Deploy Utilities', () => {
})
describe('createSchedulesForDeploy', () => {
const setupMockTransaction = (
existingSchedules: Array<{ id: string; blockId: string }> = []
) => {
mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue(existingSchedules) })
mockSelect.mockReturnValue({ from: mockFrom })
}
it('should return success with no schedule blocks', async () => {
const blocks: Record<string, BlockState> = {
'block-1': { id: 'block-1', type: 'agent', subBlocks: {} } as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(true)
expect(mockInsert).not.toHaveBeenCalled()
expect(mockTransaction).not.toHaveBeenCalled()
})
it('should create schedule for valid schedule block', async () => {
@@ -700,17 +741,15 @@ describe('Schedule Deploy Utilities', () => {
} as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(true)
expect(result.scheduleId).toBe('test-uuid')
expect(result.cronExpression).toBe('0 9 * * *')
expect(result.nextRunAt).toEqual(new Date('2025-04-15T09:00:00Z'))
expect(mockTransaction).toHaveBeenCalled()
expect(mockInsert).toHaveBeenCalled()
expect(mockOnConflictDoUpdate).toHaveBeenCalled()
})
@@ -727,16 +766,13 @@ describe('Schedule Deploy Utilities', () => {
} as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(false)
expect(result.error).toBe('Time is required for daily schedules')
expect(mockInsert).not.toHaveBeenCalled()
expect(mockTransaction).not.toHaveBeenCalled()
})
it('should use onConflictDoUpdate for existing schedules', async () => {
@@ -752,12 +788,9 @@ describe('Schedule Deploy Utilities', () => {
} as BlockState,
}
const mockTx = {
insert: mockInsert,
delete: mockDelete,
}
setupMockTransaction()
await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith({
target: expect.any(Array),
@@ -769,6 +802,27 @@ describe('Schedule Deploy Utilities', () => {
}),
})
})
it('should rollback on database error', async () => {
const blocks: Record<string, BlockState> = {
'block-1': {
id: 'block-1',
type: 'schedule',
subBlocks: {
scheduleType: { value: 'daily' },
dailyTime: { value: '09:00' },
timezone: { value: 'UTC' },
},
} as BlockState,
}
mockTransaction.mockRejectedValueOnce(new Error('Database error'))
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
expect(result.success).toBe(false)
expect(result.error).toBe('Database error')
})
})
describe('deleteSchedulesForWorkflow', () => {

View File

@@ -1,6 +1,6 @@
import { db, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { and, eq, inArray } from 'drizzle-orm'
import type { DbOrTx } from '@/lib/db/types'
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
import type { BlockState } from '@/lib/workflows/schedules/utils'
@@ -21,13 +21,13 @@ export interface ScheduleDeployResult {
}
/**
* Create or update schedule records for a workflow during deployment
* This should be called within a database transaction
* Create or update schedule records for a workflow during deployment.
* Uses a transaction to ensure atomicity - all schedules are created or none are.
*/
export async function createSchedulesForDeploy(
workflowId: string,
blocks: Record<string, BlockState>,
tx: DbOrTx,
_tx: DbOrTx,
deploymentVersionId?: string
): Promise<ScheduleDeployResult> {
const scheduleBlocks = findScheduleBlocks(blocks)
@@ -37,16 +37,16 @@ export async function createSchedulesForDeploy(
return { success: true }
}
let lastScheduleInfo: {
scheduleId: string
cronExpression?: string
nextRunAt?: Date
timezone?: string
} | null = null
// Phase 1: Validate ALL blocks before making any DB changes
const validatedBlocks: Array<{
blockId: string
cronExpression: string
nextRunAt: Date
timezone: string
}> = []
for (const block of scheduleBlocks) {
const blockId = block.id as string
const validation = validateScheduleBlock(block)
if (!validation.isValid) {
return {
@@ -54,62 +54,112 @@ export async function createSchedulesForDeploy(
error: validation.error,
}
}
const { cronExpression, nextRunAt, timezone } = validation
const scheduleId = crypto.randomUUID()
const now = new Date()
const values = {
id: scheduleId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
validatedBlocks.push({
blockId,
cronExpression: cronExpression!,
triggerType: 'schedule',
createdAt: now,
updatedAt: now,
nextRunAt: nextRunAt!,
timezone: timezone!,
status: 'active',
failedCount: 0,
}
const setValues = {
blockId,
cronExpression: cronExpression!,
...(deploymentVersionId ? { deploymentVersionId } : {}),
updatedAt: now,
nextRunAt: nextRunAt!,
timezone: timezone!,
status: 'active',
failedCount: 0,
}
await tx
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [
workflowSchedule.workflowId,
workflowSchedule.blockId,
workflowSchedule.deploymentVersionId,
],
set: setValues,
})
logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, {
scheduleId: values.id,
cronExpression,
nextRunAt: nextRunAt?.toISOString(),
cronExpression: validation.cronExpression!,
nextRunAt: validation.nextRunAt!,
timezone: validation.timezone!,
})
}
lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone }
// Phase 2: All validations passed - now do DB operations in a transaction
let lastScheduleInfo: {
scheduleId: string
cronExpression?: string
nextRunAt?: Date
timezone?: string
} | null = null
try {
await db.transaction(async (tx) => {
const currentBlockIds = new Set(validatedBlocks.map((b) => b.blockId))
const existingSchedules = await tx
.select({ id: workflowSchedule.id, blockId: workflowSchedule.blockId })
.from(workflowSchedule)
.where(
deploymentVersionId
? and(
eq(workflowSchedule.workflowId, workflowId),
eq(workflowSchedule.deploymentVersionId, deploymentVersionId)
)
: eq(workflowSchedule.workflowId, workflowId)
)
const orphanedScheduleIds = existingSchedules
.filter((s) => s.blockId && !currentBlockIds.has(s.blockId))
.map((s) => s.id)
if (orphanedScheduleIds.length > 0) {
logger.info(
`Deleting ${orphanedScheduleIds.length} orphaned schedule(s) for workflow ${workflowId}`
)
await tx.delete(workflowSchedule).where(inArray(workflowSchedule.id, orphanedScheduleIds))
}
for (const validated of validatedBlocks) {
const { blockId, cronExpression, nextRunAt, timezone } = validated
const scheduleId = crypto.randomUUID()
const now = new Date()
const values = {
id: scheduleId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId,
cronExpression,
triggerType: 'schedule',
createdAt: now,
updatedAt: now,
nextRunAt,
timezone,
status: 'active',
failedCount: 0,
}
const setValues = {
blockId,
cronExpression,
...(deploymentVersionId ? { deploymentVersionId } : {}),
updatedAt: now,
nextRunAt,
timezone,
status: 'active',
failedCount: 0,
}
await tx
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [
workflowSchedule.workflowId,
workflowSchedule.blockId,
workflowSchedule.deploymentVersionId,
],
set: setValues,
})
logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, {
scheduleId: values.id,
cronExpression,
nextRunAt: nextRunAt?.toISOString(),
})
lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone }
}
})
} catch (error) {
logger.error(`Failed to create schedules for workflow ${workflowId}`, error)
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to create schedules',
}
}
return {
success: true,
...lastScheduleInfo,
...(lastScheduleInfo ?? {}),
}
}
@@ -145,8 +195,25 @@ export async function cleanupDeploymentVersion(params: {
workflow: Record<string, unknown>
requestId: string
deploymentVersionId: string
/**
* If true, skip external subscription cleanup (already done by saveTriggerWebhooksForDeploy).
* Only deletes DB records.
*/
skipExternalCleanup?: boolean
}): Promise<void> {
const { workflowId, workflow, requestId, deploymentVersionId } = params
await cleanupWebhooksForWorkflow(workflowId, workflow, requestId, deploymentVersionId)
const {
workflowId,
workflow,
requestId,
deploymentVersionId,
skipExternalCleanup = false,
} = params
await cleanupWebhooksForWorkflow(
workflowId,
workflow,
requestId,
deploymentVersionId,
skipExternalCleanup
)
await deleteSchedulesForWorkflow(workflowId, db, deploymentVersionId)
}

View File

@@ -98,9 +98,12 @@ function getMissingConfigError(scheduleType: string): string {
/**
* Find schedule blocks in a workflow's blocks
* Only returns enabled schedule blocks (disabled blocks are skipped)
*/
export function findScheduleBlocks(blocks: Record<string, BlockState>): BlockState[] {
return Object.values(blocks).filter((block) => block.type === 'schedule')
return Object.values(blocks).filter(
(block) => block.type === 'schedule' && block.enabled !== false
)
}
/**

View File

@@ -4,7 +4,6 @@ import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import type { SubBlockConfig } from '@/blocks/types'
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
@@ -1171,93 +1170,6 @@ export const useWorkflowStore = create<WorkflowStore>()(
// Note: Socket.IO handles real-time sync automatically
},
toggleBlockTriggerMode: (id: string) => {
const block = get().blocks[id]
if (!block) return
const newTriggerMode = !block.triggerMode
// When switching TO trigger mode, check if block is inside a subflow
if (newTriggerMode && TriggerUtils.isBlockInSubflow(id, get().blocks)) {
logger.warn('Cannot enable trigger mode for block inside loop or parallel subflow', {
blockId: id,
blockType: block.type,
})
return
}
// When switching TO trigger mode, remove all incoming connections
let filteredEdges = [...get().edges]
if (newTriggerMode) {
// Remove edges where this block is the target
filteredEdges = filteredEdges.filter((edge) => edge.target !== id)
logger.info(
`Removed ${get().edges.length - filteredEdges.length} incoming connections for trigger mode`,
{
blockId: id,
blockType: block.type,
}
)
}
const newState = {
blocks: {
...get().blocks,
[id]: {
...block,
triggerMode: newTriggerMode,
},
},
edges: filteredEdges,
loops: { ...get().loops },
parallels: { ...get().parallels },
}
set(newState)
get().updateLastSaved()
// Handle webhook enable/disable when toggling trigger mode
const handleWebhookToggle = async () => {
try {
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!activeWorkflowId) return
// Check if there's a webhook for this block
const response = await fetch(
`/api/webhooks?workflowId=${activeWorkflowId}&blockId=${id}`
)
if (response.ok) {
const data = await response.json()
if (data.webhooks && data.webhooks.length > 0) {
const webhook = data.webhooks[0].webhook
// Update webhook's isActive status based on trigger mode
const updateResponse = await fetch(`/api/webhooks/${webhook.id}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
isActive: newTriggerMode,
}),
})
if (!updateResponse.ok) {
logger.error('Failed to update webhook status')
}
}
}
} catch (error) {
logger.error('Error toggling webhook status:', error)
}
}
// Handle webhook toggle asynchronously
handleWebhookToggle()
// Note: Socket.IO handles real-time sync automatically
},
// Parallel block methods implementation
updateParallelCount: (parallelId: string, count: number) => {
const block = get().blocks[parallelId]

View File

@@ -241,7 +241,6 @@ export interface WorkflowActions {
setNeedsRedeploymentFlag: (needsRedeployment: boolean) => void
revertToDeployedState: (deployedState: WorkflowState) => void
toggleBlockAdvancedMode: (id: string) => void
toggleBlockTriggerMode: (id: string) => void
setDragStartPosition: (position: DragStartPosition | null) => void
getDragStartPosition: () => DragStartPosition | null
getWorkflowState: () => WorkflowState