mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-24 06:18:04 -05:00
Compare commits
8 Commits
fix/copilo
...
fix/nested
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf22dd75ad | ||
|
|
eb767b5ede | ||
|
|
594bcac5f2 | ||
|
|
d3f20311d0 | ||
|
|
587d44ad6f | ||
|
|
8bf2e69942 | ||
|
|
12100e6881 | ||
|
|
23294683e1 |
@@ -640,6 +640,7 @@ export interface AdminDeployResult {
|
||||
isDeployed: boolean
|
||||
version: number
|
||||
deployedAt: string
|
||||
warnings?: string[]
|
||||
}
|
||||
|
||||
export interface AdminUndeployResult {
|
||||
|
||||
@@ -1,14 +1,23 @@
|
||||
import { db, workflow } from '@sim/db'
|
||||
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
|
||||
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import {
|
||||
cleanupWebhooksForWorkflow,
|
||||
restorePreviousVersionWebhooks,
|
||||
saveTriggerWebhooksForDeploy,
|
||||
} from '@/lib/webhooks/deploy'
|
||||
import {
|
||||
deployWorkflow,
|
||||
loadWorkflowFromNormalizedTables,
|
||||
undeployWorkflow,
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { createSchedulesForDeploy, validateWorkflowSchedules } from '@/lib/workflows/schedules'
|
||||
import {
|
||||
cleanupDeploymentVersion,
|
||||
createSchedulesForDeploy,
|
||||
validateWorkflowSchedules,
|
||||
} from '@/lib/workflows/schedules'
|
||||
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
|
||||
import {
|
||||
badRequestResponse,
|
||||
@@ -28,10 +37,11 @@ interface RouteParams {
|
||||
|
||||
export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
|
||||
const { id: workflowId } = await context.params
|
||||
const requestId = generateRequestId()
|
||||
|
||||
try {
|
||||
const [workflowRecord] = await db
|
||||
.select({ id: workflow.id, name: workflow.name })
|
||||
.select()
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
@@ -50,6 +60,18 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
|
||||
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
|
||||
}
|
||||
|
||||
const [currentActiveVersion] = await db
|
||||
.select({ id: workflowDeploymentVersion.id })
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, workflowId),
|
||||
eq(workflowDeploymentVersion.isActive, true)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
const previousVersionId = currentActiveVersion?.id
|
||||
|
||||
const deployResult = await deployWorkflow({
|
||||
workflowId,
|
||||
deployedBy: ADMIN_ACTOR_ID,
|
||||
@@ -65,6 +87,32 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
|
||||
return internalErrorResponse('Failed to resolve deployment version')
|
||||
}
|
||||
|
||||
const workflowData = workflowRecord as Record<string, unknown>
|
||||
|
||||
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
|
||||
request,
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
userId: workflowRecord.userId,
|
||||
blocks: normalizedData.blocks,
|
||||
requestId,
|
||||
deploymentVersionId: deployResult.deploymentVersionId,
|
||||
previousVersionId,
|
||||
})
|
||||
|
||||
if (!triggerSaveResult.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
requestId,
|
||||
deploymentVersionId: deployResult.deploymentVersionId,
|
||||
})
|
||||
await undeployWorkflow({ workflowId })
|
||||
return internalErrorResponse(
|
||||
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
|
||||
)
|
||||
}
|
||||
|
||||
const scheduleResult = await createSchedulesForDeploy(
|
||||
workflowId,
|
||||
normalizedData.blocks,
|
||||
@@ -72,15 +120,58 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
|
||||
deployResult.deploymentVersionId
|
||||
)
|
||||
if (!scheduleResult.success) {
|
||||
logger.warn(`Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`)
|
||||
logger.error(
|
||||
`[${requestId}] Admin API: Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`
|
||||
)
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
requestId,
|
||||
deploymentVersionId: deployResult.deploymentVersionId,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData,
|
||||
userId: workflowRecord.userId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
await undeployWorkflow({ workflowId })
|
||||
return internalErrorResponse(scheduleResult.error || 'Failed to create schedule')
|
||||
}
|
||||
|
||||
logger.info(`Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`)
|
||||
if (previousVersionId && previousVersionId !== deployResult.deploymentVersionId) {
|
||||
try {
|
||||
logger.info(`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId}`)
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
requestId,
|
||||
deploymentVersionId: previousVersionId,
|
||||
skipExternalCleanup: true,
|
||||
})
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`
|
||||
)
|
||||
|
||||
// Sync MCP tools with the latest parameter schema
|
||||
await syncMcpToolsForWorkflow({ workflowId, requestId, context: 'deploy' })
|
||||
|
||||
const response: AdminDeployResult = {
|
||||
isDeployed: true,
|
||||
version: deployResult.version!,
|
||||
deployedAt: deployResult.deployedAt!.toISOString(),
|
||||
warnings: triggerSaveResult.warnings,
|
||||
}
|
||||
|
||||
return singleResponse(response)
|
||||
@@ -105,7 +196,6 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
|
||||
return notFoundResponse('Workflow')
|
||||
}
|
||||
|
||||
// Clean up external webhook subscriptions before undeploying
|
||||
await cleanupWebhooksForWorkflow(
|
||||
workflowId,
|
||||
workflowRecord as Record<string, unknown>,
|
||||
@@ -117,6 +207,8 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
|
||||
return internalErrorResponse(result.error || 'Failed to undeploy workflow')
|
||||
}
|
||||
|
||||
await removeMcpToolsForWorkflow(workflowId, requestId)
|
||||
|
||||
logger.info(`Admin API: Undeployed workflow ${workflowId}`)
|
||||
|
||||
const response: AdminUndeployResult = {
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
import { db, workflow } from '@sim/db'
|
||||
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
|
||||
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
|
||||
import {
|
||||
cleanupDeploymentVersion,
|
||||
createSchedulesForDeploy,
|
||||
validateWorkflowSchedules,
|
||||
} from '@/lib/workflows/schedules'
|
||||
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
|
||||
import {
|
||||
badRequestResponse,
|
||||
@@ -9,6 +17,7 @@ import {
|
||||
notFoundResponse,
|
||||
singleResponse,
|
||||
} from '@/app/api/v1/admin/responses'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('AdminWorkflowActivateVersionAPI')
|
||||
|
||||
@@ -18,11 +27,12 @@ interface RouteParams {
|
||||
}
|
||||
|
||||
export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
|
||||
const requestId = generateRequestId()
|
||||
const { id: workflowId, versionId } = await context.params
|
||||
|
||||
try {
|
||||
const [workflowRecord] = await db
|
||||
.select({ id: workflow.id })
|
||||
.select()
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
@@ -36,23 +46,161 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
|
||||
return badRequestResponse('Invalid version number')
|
||||
}
|
||||
|
||||
const [versionRow] = await db
|
||||
.select({
|
||||
id: workflowDeploymentVersion.id,
|
||||
state: workflowDeploymentVersion.state,
|
||||
})
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, workflowId),
|
||||
eq(workflowDeploymentVersion.version, versionNum)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (!versionRow?.state) {
|
||||
return notFoundResponse('Deployment version')
|
||||
}
|
||||
|
||||
const [currentActiveVersion] = await db
|
||||
.select({ id: workflowDeploymentVersion.id })
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, workflowId),
|
||||
eq(workflowDeploymentVersion.isActive, true)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
const previousVersionId = currentActiveVersion?.id
|
||||
|
||||
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
|
||||
const blocks = deployedState.blocks
|
||||
if (!blocks || typeof blocks !== 'object') {
|
||||
return internalErrorResponse('Invalid deployed state structure')
|
||||
}
|
||||
|
||||
const workflowData = workflowRecord as Record<string, unknown>
|
||||
|
||||
const scheduleValidation = validateWorkflowSchedules(blocks)
|
||||
if (!scheduleValidation.isValid) {
|
||||
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
|
||||
}
|
||||
|
||||
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
|
||||
request,
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
userId: workflowRecord.userId,
|
||||
blocks,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
previousVersionId,
|
||||
forceRecreateSubscriptions: true,
|
||||
})
|
||||
|
||||
if (!triggerSaveResult.success) {
|
||||
logger.error(
|
||||
`[${requestId}] Admin API: Failed to sync triggers for workflow ${workflowId}`,
|
||||
triggerSaveResult.error
|
||||
)
|
||||
return internalErrorResponse(
|
||||
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
|
||||
)
|
||||
}
|
||||
|
||||
const scheduleResult = await createSchedulesForDeploy(workflowId, blocks, db, versionRow.id)
|
||||
|
||||
if (!scheduleResult.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData,
|
||||
userId: workflowRecord.userId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return internalErrorResponse(scheduleResult.error || 'Failed to sync schedules')
|
||||
}
|
||||
|
||||
const result = await activateWorkflowVersion({ workflowId, version: versionNum })
|
||||
if (!result.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData,
|
||||
userId: workflowRecord.userId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
if (result.error === 'Deployment version not found') {
|
||||
return notFoundResponse('Deployment version')
|
||||
}
|
||||
return internalErrorResponse(result.error || 'Failed to activate version')
|
||||
}
|
||||
|
||||
logger.info(`Admin API: Activated version ${versionNum} for workflow ${workflowId}`)
|
||||
if (previousVersionId && previousVersionId !== versionRow.id) {
|
||||
try {
|
||||
logger.info(
|
||||
`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId} webhooks/schedules`
|
||||
)
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId,
|
||||
workflow: workflowData,
|
||||
requestId,
|
||||
deploymentVersionId: previousVersionId,
|
||||
skipExternalCleanup: true,
|
||||
})
|
||||
logger.info(`[${requestId}] Admin API: Previous version cleanup completed`)
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
await syncMcpToolsForWorkflow({
|
||||
workflowId,
|
||||
requestId,
|
||||
state: versionRow.state,
|
||||
context: 'activate',
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Admin API: Activated version ${versionNum} for workflow ${workflowId}`
|
||||
)
|
||||
|
||||
return singleResponse({
|
||||
success: true,
|
||||
version: versionNum,
|
||||
deployedAt: result.deployedAt!.toISOString(),
|
||||
warnings: triggerSaveResult.warnings,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`Admin API: Failed to activate version for workflow ${workflowId}`, { error })
|
||||
logger.error(
|
||||
`[${requestId}] Admin API: Failed to activate version for workflow ${workflowId}`,
|
||||
{
|
||||
error,
|
||||
}
|
||||
)
|
||||
return internalErrorResponse('Failed to activate deployment version')
|
||||
}
|
||||
})
|
||||
|
||||
@@ -7,12 +7,7 @@ import { getSession } from '@/lib/auth'
|
||||
import { validateInteger } from '@/lib/core/security/input-validation'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
|
||||
import {
|
||||
cleanupExternalWebhook,
|
||||
createExternalWebhookSubscription,
|
||||
shouldRecreateExternalWebhookSubscription,
|
||||
} from '@/lib/webhooks/provider-subscriptions'
|
||||
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('WebhookAPI')
|
||||
@@ -88,7 +83,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
|
||||
}
|
||||
}
|
||||
|
||||
// Update a webhook
|
||||
export async function PATCH(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
|
||||
@@ -103,7 +97,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
}
|
||||
|
||||
const body = await request.json()
|
||||
const { path, provider, providerConfig, isActive, failedCount } = body
|
||||
const { isActive, failedCount } = body
|
||||
|
||||
if (failedCount !== undefined) {
|
||||
const validation = validateInteger(failedCount, 'failedCount', { min: 0 })
|
||||
@@ -113,28 +107,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
}
|
||||
}
|
||||
|
||||
const originalProviderConfig = providerConfig
|
||||
let resolvedProviderConfig = providerConfig
|
||||
if (providerConfig) {
|
||||
const webhookDataForResolve = await db
|
||||
.select({
|
||||
workspaceId: workflow.workspaceId,
|
||||
})
|
||||
.from(webhook)
|
||||
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
|
||||
.where(eq(webhook.id, id))
|
||||
.limit(1)
|
||||
|
||||
if (webhookDataForResolve.length > 0) {
|
||||
resolvedProviderConfig = await resolveEnvVarsInObject(
|
||||
providerConfig,
|
||||
session.user.id,
|
||||
webhookDataForResolve[0].workspaceId || undefined
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Find the webhook and check permissions
|
||||
const webhooks = await db
|
||||
.select({
|
||||
webhook: webhook,
|
||||
@@ -155,16 +127,12 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
}
|
||||
|
||||
const webhookData = webhooks[0]
|
||||
|
||||
// Check if user has permission to modify this webhook
|
||||
let canModify = false
|
||||
|
||||
// Case 1: User owns the workflow
|
||||
if (webhookData.workflow.userId === session.user.id) {
|
||||
canModify = true
|
||||
}
|
||||
|
||||
// Case 2: Workflow belongs to a workspace and user has write or admin permission
|
||||
if (!canModify && webhookData.workflow.workspaceId) {
|
||||
const userPermission = await getUserEntityPermissions(
|
||||
session.user.id,
|
||||
@@ -183,80 +151,14 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
|
||||
}
|
||||
|
||||
const existingProviderConfig =
|
||||
(webhookData.webhook.providerConfig as Record<string, unknown>) || {}
|
||||
let nextProviderConfig =
|
||||
providerConfig !== undefined &&
|
||||
resolvedProviderConfig &&
|
||||
typeof resolvedProviderConfig === 'object'
|
||||
? (resolvedProviderConfig as Record<string, unknown>)
|
||||
: existingProviderConfig
|
||||
const nextProvider = (provider ?? webhookData.webhook.provider) as string
|
||||
|
||||
if (
|
||||
providerConfig !== undefined &&
|
||||
shouldRecreateExternalWebhookSubscription({
|
||||
previousProvider: webhookData.webhook.provider as string,
|
||||
nextProvider,
|
||||
previousConfig: existingProviderConfig,
|
||||
nextConfig: nextProviderConfig,
|
||||
})
|
||||
) {
|
||||
await cleanupExternalWebhook(
|
||||
{ ...webhookData.webhook, providerConfig: existingProviderConfig },
|
||||
webhookData.workflow,
|
||||
requestId
|
||||
)
|
||||
|
||||
const result = await createExternalWebhookSubscription(
|
||||
request,
|
||||
{
|
||||
...webhookData.webhook,
|
||||
provider: nextProvider,
|
||||
providerConfig: nextProviderConfig,
|
||||
},
|
||||
webhookData.workflow,
|
||||
session.user.id,
|
||||
requestId
|
||||
)
|
||||
|
||||
nextProviderConfig = result.updatedProviderConfig as Record<string, unknown>
|
||||
}
|
||||
|
||||
logger.debug(`[${requestId}] Updating webhook properties`, {
|
||||
hasPathUpdate: path !== undefined,
|
||||
hasProviderUpdate: provider !== undefined,
|
||||
hasConfigUpdate: providerConfig !== undefined,
|
||||
hasActiveUpdate: isActive !== undefined,
|
||||
hasFailedCountUpdate: failedCount !== undefined,
|
||||
})
|
||||
|
||||
let finalProviderConfig = webhooks[0].webhook.providerConfig
|
||||
if (providerConfig !== undefined && originalProviderConfig) {
|
||||
const existingConfig = existingProviderConfig
|
||||
finalProviderConfig = {
|
||||
...originalProviderConfig,
|
||||
credentialId: existingConfig.credentialId,
|
||||
credentialSetId: existingConfig.credentialSetId,
|
||||
userId: existingConfig.userId,
|
||||
historyId: existingConfig.historyId,
|
||||
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
|
||||
setupCompleted: existingConfig.setupCompleted,
|
||||
externalId: existingConfig.externalId,
|
||||
}
|
||||
for (const [key, value] of Object.entries(nextProviderConfig)) {
|
||||
if (!(key in originalProviderConfig)) {
|
||||
;(finalProviderConfig as Record<string, unknown>)[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const updatedWebhook = await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
path: path !== undefined ? path : webhooks[0].webhook.path,
|
||||
provider: provider !== undefined ? provider : webhooks[0].webhook.provider,
|
||||
providerConfig: finalProviderConfig,
|
||||
isActive: isActive !== undefined ? isActive : webhooks[0].webhook.isActive,
|
||||
failedCount: failedCount !== undefined ? failedCount : webhooks[0].webhook.failedCount,
|
||||
updatedAt: new Date(),
|
||||
@@ -339,11 +241,8 @@ export async function DELETE(
|
||||
}
|
||||
|
||||
const foundWebhook = webhookData.webhook
|
||||
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
|
||||
|
||||
const providerConfig = foundWebhook.providerConfig as Record<string, unknown> | null
|
||||
const credentialSetId = providerConfig?.credentialSetId as string | undefined
|
||||
const blockId = providerConfig?.blockId as string | undefined
|
||||
const credentialSetId = foundWebhook.credentialSetId as string | undefined
|
||||
const blockId = foundWebhook.blockId as string | undefined
|
||||
|
||||
if (credentialSetId && blockId) {
|
||||
const allCredentialSetWebhooks = await db
|
||||
@@ -351,10 +250,9 @@ export async function DELETE(
|
||||
.from(webhook)
|
||||
.where(and(eq(webhook.workflowId, webhookData.workflow.id), eq(webhook.blockId, blockId)))
|
||||
|
||||
const webhooksToDelete = allCredentialSetWebhooks.filter((w) => {
|
||||
const config = w.providerConfig as Record<string, unknown> | null
|
||||
return config?.credentialSetId === credentialSetId
|
||||
})
|
||||
const webhooksToDelete = allCredentialSetWebhooks.filter(
|
||||
(w) => w.credentialSetId === credentialSetId
|
||||
)
|
||||
|
||||
for (const w of webhooksToDelete) {
|
||||
await cleanupExternalWebhook(w, webhookData.workflow, requestId)
|
||||
|
||||
@@ -7,9 +7,21 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getProviderIdFromServiceId } from '@/lib/oauth'
|
||||
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
|
||||
import { createExternalWebhookSubscription } from '@/lib/webhooks/provider-subscriptions'
|
||||
import {
|
||||
cleanupExternalWebhook,
|
||||
createExternalWebhookSubscription,
|
||||
} from '@/lib/webhooks/provider-subscriptions'
|
||||
import { mergeNonUserFields } from '@/lib/webhooks/utils'
|
||||
import {
|
||||
configureGmailPolling,
|
||||
configureOutlookPolling,
|
||||
configureRssPolling,
|
||||
syncWebhooksForCredentialSet,
|
||||
} from '@/lib/webhooks/utils.server'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
import { extractCredentialSetId, isCredentialSetValue } from '@/executor/constants'
|
||||
|
||||
const logger = createLogger('WebhooksAPI')
|
||||
|
||||
@@ -316,8 +328,6 @@ export async function POST(request: NextRequest) {
|
||||
const directCredentialSetId = resolvedProviderConfig?.credentialSetId as string | undefined
|
||||
|
||||
if (directCredentialSetId || rawCredentialId) {
|
||||
const { isCredentialSetValue, extractCredentialSetId } = await import('@/executor/constants')
|
||||
|
||||
const credentialSetId =
|
||||
directCredentialSetId ||
|
||||
(rawCredentialId && isCredentialSetValue(rawCredentialId)
|
||||
@@ -329,11 +339,6 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Credential set detected for ${provider} trigger. Syncing webhooks for set ${credentialSetId}`
|
||||
)
|
||||
|
||||
const { getProviderIdFromServiceId } = await import('@/lib/oauth')
|
||||
const { syncWebhooksForCredentialSet, configureGmailPolling, configureOutlookPolling } =
|
||||
await import('@/lib/webhooks/utils.server')
|
||||
|
||||
// Map provider to OAuth provider ID
|
||||
const oauthProviderId = getProviderIdFromServiceId(provider)
|
||||
|
||||
const {
|
||||
@@ -466,7 +471,8 @@ export async function POST(request: NextRequest) {
|
||||
providerConfig: providerConfigOverride,
|
||||
})
|
||||
|
||||
const configToSave = { ...originalProviderConfig }
|
||||
const userProvided = originalProviderConfig as Record<string, unknown>
|
||||
const configToSave: Record<string, unknown> = { ...userProvided }
|
||||
|
||||
try {
|
||||
const result = await createExternalWebhookSubscription(
|
||||
@@ -477,11 +483,7 @@ export async function POST(request: NextRequest) {
|
||||
requestId
|
||||
)
|
||||
const updatedConfig = result.updatedProviderConfig as Record<string, unknown>
|
||||
for (const [key, value] of Object.entries(updatedConfig)) {
|
||||
if (!(key in originalProviderConfig)) {
|
||||
configToSave[key] = value
|
||||
}
|
||||
}
|
||||
mergeNonUserFields(configToSave, updatedConfig, userProvided)
|
||||
resolvedProviderConfig = updatedConfig
|
||||
externalSubscriptionCreated = result.externalSubscriptionCreated
|
||||
} catch (err) {
|
||||
@@ -547,7 +549,6 @@ export async function POST(request: NextRequest) {
|
||||
if (externalSubscriptionCreated) {
|
||||
logger.error(`[${requestId}] DB save failed, cleaning up external subscription`, dbError)
|
||||
try {
|
||||
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
|
||||
await cleanupExternalWebhook(
|
||||
createTempWebhookData(configToSave),
|
||||
workflowRecord,
|
||||
@@ -567,7 +568,6 @@ export async function POST(request: NextRequest) {
|
||||
if (savedWebhook && provider === 'gmail') {
|
||||
logger.info(`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`)
|
||||
try {
|
||||
const { configureGmailPolling } = await import('@/lib/webhooks/utils.server')
|
||||
const success = await configureGmailPolling(savedWebhook, requestId)
|
||||
|
||||
if (!success) {
|
||||
@@ -606,7 +606,6 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Outlook provider detected. Setting up Outlook webhook configuration.`
|
||||
)
|
||||
try {
|
||||
const { configureOutlookPolling } = await import('@/lib/webhooks/utils.server')
|
||||
const success = await configureOutlookPolling(savedWebhook, requestId)
|
||||
|
||||
if (!success) {
|
||||
@@ -643,7 +642,6 @@ export async function POST(request: NextRequest) {
|
||||
if (savedWebhook && provider === 'rss') {
|
||||
logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`)
|
||||
try {
|
||||
const { configureRssPolling } = await import('@/lib/webhooks/utils.server')
|
||||
const success = await configureRssPolling(savedWebhook, requestId)
|
||||
|
||||
if (!success) {
|
||||
|
||||
@@ -4,7 +4,11 @@ import { and, desc, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { cleanupWebhooksForWorkflow, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
|
||||
import {
|
||||
cleanupWebhooksForWorkflow,
|
||||
restorePreviousVersionWebhooks,
|
||||
saveTriggerWebhooksForDeploy,
|
||||
} from '@/lib/webhooks/deploy'
|
||||
import {
|
||||
deployWorkflow,
|
||||
loadWorkflowFromNormalizedTables,
|
||||
@@ -135,6 +139,18 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
|
||||
}
|
||||
|
||||
const [currentActiveVersion] = await db
|
||||
.select({ id: workflowDeploymentVersion.id })
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.isActive, true)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
const previousVersionId = currentActiveVersion?.id
|
||||
|
||||
const deployResult = await deployWorkflow({
|
||||
workflowId: id,
|
||||
deployedBy: actorUserId,
|
||||
@@ -161,6 +177,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
blocks: normalizedData.blocks,
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
previousVersionId,
|
||||
})
|
||||
|
||||
if (!triggerSaveResult.success) {
|
||||
@@ -194,6 +211,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
await undeployWorkflow({ workflowId: id })
|
||||
return createErrorResponse(scheduleResult.error || 'Failed to create schedule', 500)
|
||||
}
|
||||
@@ -208,6 +234,25 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
)
|
||||
}
|
||||
|
||||
if (previousVersionId && previousVersionId !== deploymentVersionId) {
|
||||
try {
|
||||
logger.info(`[${requestId}] Cleaning up previous version ${previousVersionId} DB records`)
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: previousVersionId,
|
||||
skipExternalCleanup: true,
|
||||
})
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
|
||||
cleanupError
|
||||
)
|
||||
// Non-fatal - continue with success response
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Workflow deployed successfully: ${id}`)
|
||||
|
||||
// Sync MCP tools with the latest parameter schema
|
||||
@@ -228,6 +273,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
nextRunAt: scheduleInfo.nextRunAt,
|
||||
}
|
||||
: undefined,
|
||||
warnings: triggerSaveResult.warnings,
|
||||
})
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Error deploying workflow: ${id}`, {
|
||||
|
||||
@@ -4,7 +4,7 @@ import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
|
||||
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
|
||||
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
|
||||
import {
|
||||
cleanupDeploymentVersion,
|
||||
@@ -85,6 +85,11 @@ export async function POST(
|
||||
return createErrorResponse('Invalid deployed state structure', 500)
|
||||
}
|
||||
|
||||
const scheduleValidation = validateWorkflowSchedules(blocks)
|
||||
if (!scheduleValidation.isValid) {
|
||||
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
|
||||
}
|
||||
|
||||
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
|
||||
request,
|
||||
workflowId: id,
|
||||
@@ -93,6 +98,8 @@ export async function POST(
|
||||
blocks,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
previousVersionId,
|
||||
forceRecreateSubscriptions: true,
|
||||
})
|
||||
|
||||
if (!triggerSaveResult.success) {
|
||||
@@ -102,11 +109,6 @@ export async function POST(
|
||||
)
|
||||
}
|
||||
|
||||
const scheduleValidation = validateWorkflowSchedules(blocks)
|
||||
if (!scheduleValidation.isValid) {
|
||||
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
|
||||
}
|
||||
|
||||
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
|
||||
|
||||
if (!scheduleResult.success) {
|
||||
@@ -116,6 +118,15 @@ export async function POST(
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
|
||||
}
|
||||
|
||||
@@ -127,6 +138,15 @@ export async function POST(
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
|
||||
}
|
||||
|
||||
@@ -140,6 +160,7 @@ export async function POST(
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: previousVersionId,
|
||||
skipExternalCleanup: true,
|
||||
})
|
||||
logger.info(`[${requestId}] Previous version cleanup completed`)
|
||||
} catch (cleanupError) {
|
||||
@@ -157,7 +178,11 @@ export async function POST(
|
||||
context: 'activate',
|
||||
})
|
||||
|
||||
return createSuccessResponse({ success: true, deployedAt: result.deployedAt })
|
||||
return createSuccessResponse({
|
||||
success: true,
|
||||
deployedAt: result.deployedAt,
|
||||
warnings: triggerSaveResult.warnings,
|
||||
})
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error)
|
||||
return createErrorResponse(error.message || 'Failed to activate deployment', 500)
|
||||
|
||||
@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
||||
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
return NextResponse.json(filteredResult)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = error.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
output: executionResult?.output,
|
||||
error: executionResult?.error || error.message || 'Execution failed',
|
||||
error: executionResult?.error || errorMessage || 'Execution failed',
|
||||
metadata: executionResult?.metadata
|
||||
? {
|
||||
duration: executionResult.metadata.duration,
|
||||
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = error.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
|
||||
@@ -95,6 +95,7 @@ export function DeployModal({
|
||||
const [activeTab, setActiveTab] = useState<TabView>('general')
|
||||
const [chatSubmitting, setChatSubmitting] = useState(false)
|
||||
const [apiDeployError, setApiDeployError] = useState<string | null>(null)
|
||||
const [apiDeployWarnings, setApiDeployWarnings] = useState<string[]>([])
|
||||
const [isChatFormValid, setIsChatFormValid] = useState(false)
|
||||
const [selectedStreamingOutputs, setSelectedStreamingOutputs] = useState<string[]>([])
|
||||
|
||||
@@ -227,6 +228,7 @@ export function DeployModal({
|
||||
if (open && workflowId) {
|
||||
setActiveTab('general')
|
||||
setApiDeployError(null)
|
||||
setApiDeployWarnings([])
|
||||
}
|
||||
}, [open, workflowId])
|
||||
|
||||
@@ -282,9 +284,13 @@ export function DeployModal({
|
||||
if (!workflowId) return
|
||||
|
||||
setApiDeployError(null)
|
||||
setApiDeployWarnings([])
|
||||
|
||||
try {
|
||||
await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
|
||||
const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
|
||||
if (result.warnings && result.warnings.length > 0) {
|
||||
setApiDeployWarnings(result.warnings)
|
||||
}
|
||||
await refetchDeployedState()
|
||||
} catch (error: unknown) {
|
||||
logger.error('Error deploying workflow:', { error })
|
||||
@@ -297,8 +303,13 @@ export function DeployModal({
|
||||
async (version: number) => {
|
||||
if (!workflowId) return
|
||||
|
||||
setApiDeployWarnings([])
|
||||
|
||||
try {
|
||||
await activateVersionMutation.mutateAsync({ workflowId, version })
|
||||
const result = await activateVersionMutation.mutateAsync({ workflowId, version })
|
||||
if (result.warnings && result.warnings.length > 0) {
|
||||
setApiDeployWarnings(result.warnings)
|
||||
}
|
||||
await refetchDeployedState()
|
||||
} catch (error) {
|
||||
logger.error('Error promoting version:', { error })
|
||||
@@ -324,9 +335,13 @@ export function DeployModal({
|
||||
if (!workflowId) return
|
||||
|
||||
setApiDeployError(null)
|
||||
setApiDeployWarnings([])
|
||||
|
||||
try {
|
||||
await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
|
||||
const result = await deployMutation.mutateAsync({ workflowId, deployChatEnabled: false })
|
||||
if (result.warnings && result.warnings.length > 0) {
|
||||
setApiDeployWarnings(result.warnings)
|
||||
}
|
||||
await refetchDeployedState()
|
||||
} catch (error: unknown) {
|
||||
logger.error('Error redeploying workflow:', { error })
|
||||
@@ -338,6 +353,7 @@ export function DeployModal({
|
||||
const handleCloseModal = useCallback(() => {
|
||||
setChatSubmitting(false)
|
||||
setApiDeployError(null)
|
||||
setApiDeployWarnings([])
|
||||
onOpenChange(false)
|
||||
}, [onOpenChange])
|
||||
|
||||
@@ -479,6 +495,14 @@ export function DeployModal({
|
||||
<div>{apiDeployError}</div>
|
||||
</div>
|
||||
)}
|
||||
{apiDeployWarnings.length > 0 && (
|
||||
<div className='mb-3 rounded-[4px] border border-amber-500/30 bg-amber-500/10 p-3 text-amber-700 dark:text-amber-400 text-sm'>
|
||||
<div className='font-semibold'>Deployment Warning</div>
|
||||
{apiDeployWarnings.map((warning, index) => (
|
||||
<div key={index}>{warning}</div>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
<ModalTabsContent value='general'>
|
||||
<GeneralDeploy
|
||||
workflowId={workflowId}
|
||||
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
||||
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
|
||||
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
|
||||
}
|
||||
|
||||
function isExecutionResult(value: unknown): value is ExecutionResult {
|
||||
if (!isRecord(value)) return false
|
||||
return typeof value.success === 'boolean' && isRecord(value.output)
|
||||
}
|
||||
|
||||
function extractExecutionResult(error: unknown): ExecutionResult | null {
|
||||
if (!isRecord(error)) return null
|
||||
const candidate = error.executionResult
|
||||
return isExecutionResult(candidate) ? candidate : null
|
||||
}
|
||||
|
||||
export function useWorkflowExecution() {
|
||||
const queryClient = useQueryClient()
|
||||
const currentWorkflow = useCurrentWorkflow()
|
||||
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
|
||||
|
||||
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
|
||||
const normalizedMessage = normalizeErrorMessage(error)
|
||||
const executionResultFromError = extractExecutionResult(error)
|
||||
|
||||
let errorResult: ExecutionResult
|
||||
|
||||
if (executionResultFromError) {
|
||||
if (hasExecutionResult(error)) {
|
||||
const executionResultFromError = error.executionResult
|
||||
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
|
||||
|
||||
errorResult = {
|
||||
|
||||
@@ -21,7 +21,7 @@ import {
|
||||
} from '@/lib/workflows/schedules/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
|
||||
|
||||
const logger = createLogger('TriggerScheduleExecution')
|
||||
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
|
||||
} catch (error: unknown) {
|
||||
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { safeAssign } from '@/tools/safe-assign'
|
||||
import { getTrigger, isTriggerValid } from '@/triggers'
|
||||
|
||||
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult || {
|
||||
success: false,
|
||||
output: {},
|
||||
logs: [],
|
||||
}
|
||||
const executionResult = hasExecutionResult(error)
|
||||
? error.executionResult
|
||||
: {
|
||||
success: false,
|
||||
output: {},
|
||||
logs: [],
|
||||
}
|
||||
const { traceSpans } = buildTraceSpans(executionResult)
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
const logger = createLogger('TriggerWorkflowExecution')
|
||||
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
executionId,
|
||||
})
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
|
||||
interface ChildWorkflowErrorOptions {
|
||||
message: string
|
||||
childWorkflowName: string
|
||||
childTraceSpans?: TraceSpan[]
|
||||
executionResult?: ExecutionResult
|
||||
cause?: Error
|
||||
}
|
||||
|
||||
/**
|
||||
* Error raised when a child workflow execution fails.
|
||||
*/
|
||||
export class ChildWorkflowError extends Error {
|
||||
readonly childTraceSpans: TraceSpan[]
|
||||
readonly childWorkflowName: string
|
||||
readonly executionResult?: ExecutionResult
|
||||
|
||||
constructor(options: ChildWorkflowErrorOptions) {
|
||||
super(options.message, { cause: options.cause })
|
||||
this.name = 'ChildWorkflowError'
|
||||
this.childWorkflowName = options.childWorkflowName
|
||||
this.childTraceSpans = options.childTraceSpans ?? []
|
||||
this.executionResult = options.executionResult
|
||||
}
|
||||
|
||||
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
|
||||
return error instanceof ChildWorkflowError
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
isSentinelBlockType,
|
||||
} from '@/executor/constants'
|
||||
import type { DAGNode } from '@/executor/dag/builder'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
||||
import {
|
||||
generatePauseContextId,
|
||||
@@ -213,24 +214,26 @@ export class BlockExecutor {
|
||||
? resolvedInputs
|
||||
: ((block.config?.params as Record<string, any> | undefined) ?? {})
|
||||
|
||||
const errorOutput: NormalizedBlockOutput = {
|
||||
error: errorMessage,
|
||||
}
|
||||
|
||||
if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
errorOutput.childTraceSpans = error.childTraceSpans
|
||||
errorOutput.childWorkflowName = error.childWorkflowName
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
if (blockLog) {
|
||||
blockLog.endedAt = new Date().toISOString()
|
||||
blockLog.durationMs = duration
|
||||
blockLog.success = false
|
||||
blockLog.error = errorMessage
|
||||
blockLog.input = input
|
||||
blockLog.output = this.filterOutputForLog(block, errorOutput)
|
||||
}
|
||||
|
||||
const errorOutput: NormalizedBlockOutput = {
|
||||
error: errorMessage,
|
||||
}
|
||||
|
||||
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
|
||||
errorOutput.childTraceSpans = (error as any).childTraceSpans
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
logger.error(
|
||||
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
|
||||
{
|
||||
|
||||
@@ -13,7 +13,7 @@ import type {
|
||||
PausePoint,
|
||||
ResumeStatus,
|
||||
} from '@/executor/types'
|
||||
import { normalizeError } from '@/executor/utils/errors'
|
||||
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
|
||||
|
||||
const logger = createLogger('ExecutionEngine')
|
||||
|
||||
@@ -170,8 +170,8 @@ export class ExecutionEngine {
|
||||
metadata: this.context.metadata,
|
||||
}
|
||||
|
||||
if (error && typeof error === 'object') {
|
||||
;(error as any).executionResult = executionResult
|
||||
if (error instanceof Error) {
|
||||
attachExecutionResult(error, executionResult)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import { Executor } from '@/executor'
|
||||
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type {
|
||||
BlockHandler,
|
||||
ExecutionContext,
|
||||
ExecutionResult,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
|
||||
import { parseJSON } from '@/executor/utils/json'
|
||||
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
|
||||
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
)
|
||||
|
||||
return mappedResult
|
||||
} catch (error: any) {
|
||||
} catch (error: unknown) {
|
||||
logger.error(`Error executing child workflow ${workflowId}:`, error)
|
||||
|
||||
const { workflows } = useWorkflowRegistry.getState()
|
||||
const workflowMetadata = workflows[workflowId]
|
||||
const childWorkflowName = workflowMetadata?.name || workflowId
|
||||
|
||||
const originalError = error.message || 'Unknown error'
|
||||
const wrappedError = new Error(
|
||||
`Error in child workflow "${childWorkflowName}": ${originalError}`
|
||||
)
|
||||
const originalError = error instanceof Error ? error.message : 'Unknown error'
|
||||
let childTraceSpans: WorkflowTraceSpan[] = []
|
||||
let executionResult: ExecutionResult | undefined
|
||||
|
||||
if (error.executionResult?.logs) {
|
||||
const executionResult = error.executionResult as ExecutionResult
|
||||
if (hasExecutionResult(error) && error.executionResult.logs) {
|
||||
executionResult = error.executionResult
|
||||
|
||||
logger.info(`Extracting child trace spans from error.executionResult`, {
|
||||
hasLogs: (executionResult.logs?.length ?? 0) > 0,
|
||||
logCount: executionResult.logs?.length ?? 0,
|
||||
})
|
||||
|
||||
const childTraceSpans = this.captureChildWorkflowLogs(
|
||||
executionResult,
|
||||
childWorkflowName,
|
||||
ctx
|
||||
)
|
||||
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
|
||||
|
||||
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
|
||||
;(wrappedError as any).childTraceSpans = childTraceSpans
|
||||
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
|
||||
;(wrappedError as any).childTraceSpans = error.childTraceSpans
|
||||
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
childTraceSpans = error.childTraceSpans
|
||||
}
|
||||
|
||||
throw wrappedError
|
||||
throw new ChildWorkflowError({
|
||||
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
|
||||
childWorkflowName,
|
||||
childTraceSpans,
|
||||
executionResult,
|
||||
cause: error instanceof Error ? error : undefined,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
|
||||
if (!success) {
|
||||
logger.warn(`Child workflow ${childWorkflowName} failed`)
|
||||
const error = new Error(
|
||||
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
|
||||
)
|
||||
;(error as any).childTraceSpans = childTraceSpans || []
|
||||
throw error
|
||||
throw new ChildWorkflowError({
|
||||
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
|
||||
childWorkflowName,
|
||||
childTraceSpans: childTraceSpans || [],
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,6 +1,39 @@
|
||||
import type { ExecutionContext } from '@/executor/types'
|
||||
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
|
||||
/**
|
||||
* Interface for errors that carry an ExecutionResult.
|
||||
* Used when workflow execution fails and we want to preserve partial results.
|
||||
*/
|
||||
export interface ErrorWithExecutionResult extends Error {
|
||||
executionResult: ExecutionResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Type guard to check if an error carries an ExecutionResult.
|
||||
* Validates that executionResult has required fields (success, output).
|
||||
*/
|
||||
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
|
||||
if (
|
||||
!(error instanceof Error) ||
|
||||
!('executionResult' in error) ||
|
||||
error.executionResult == null ||
|
||||
typeof error.executionResult !== 'object'
|
||||
) {
|
||||
return false
|
||||
}
|
||||
|
||||
const result = error.executionResult as Record<string, unknown>
|
||||
return typeof result.success === 'boolean' && result.output != null
|
||||
}
|
||||
|
||||
/**
|
||||
* Attaches an ExecutionResult to an error for propagation to parent workflows.
|
||||
*/
|
||||
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
|
||||
Object.assign(error, { executionResult })
|
||||
}
|
||||
|
||||
export interface BlockExecutionErrorDetails {
|
||||
block: SerializedBlock
|
||||
error: Error | string
|
||||
|
||||
@@ -237,6 +237,7 @@ interface DeployWorkflowResult {
|
||||
isDeployed: boolean
|
||||
deployedAt?: string
|
||||
apiKey?: string
|
||||
warnings?: string[]
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -272,6 +273,7 @@ export function useDeployWorkflow() {
|
||||
isDeployed: data.isDeployed ?? false,
|
||||
deployedAt: data.deployedAt,
|
||||
apiKey: data.apiKey,
|
||||
warnings: data.warnings,
|
||||
}
|
||||
},
|
||||
onSuccess: (data, variables) => {
|
||||
@@ -360,6 +362,7 @@ interface ActivateVersionVariables {
|
||||
interface ActivateVersionResult {
|
||||
deployedAt?: string
|
||||
apiKey?: string
|
||||
warnings?: string[]
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -100,8 +100,13 @@ export function useExecutionStream() {
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const error = await response.json()
|
||||
throw new Error(error.error || 'Failed to start execution')
|
||||
const errorResponse = await response.json()
|
||||
const error = new Error(errorResponse.error || 'Failed to start execution')
|
||||
// Attach the execution result from server response for error handling
|
||||
if (errorResponse && typeof errorResponse === 'object') {
|
||||
Object.assign(error, { executionResult: errorResponse })
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { useCallback, useEffect, useMemo, useState } from 'react'
|
||||
import { useCallback, useEffect, useMemo } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
@@ -6,12 +6,10 @@ import { getBlock } from '@/blocks'
|
||||
import { populateTriggerFieldsFromConfig } from '@/hooks/use-trigger-config-aggregation'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
import { getTrigger, isTriggerValid } from '@/triggers'
|
||||
import { isTriggerValid } from '@/triggers'
|
||||
|
||||
const logger = createLogger('useWebhookManagement')
|
||||
|
||||
const CREDENTIAL_SET_PREFIX = 'credentialSet:'
|
||||
|
||||
interface UseWebhookManagementProps {
|
||||
blockId: string
|
||||
triggerId?: string
|
||||
@@ -24,9 +22,6 @@ interface WebhookManagementState {
|
||||
webhookPath: string
|
||||
webhookId: string | null
|
||||
isLoading: boolean
|
||||
isSaving: boolean
|
||||
saveConfig: () => Promise<boolean>
|
||||
deleteConfig: () => Promise<boolean>
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -83,11 +78,9 @@ function resolveEffectiveTriggerId(
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to manage webhook lifecycle for trigger blocks
|
||||
* Handles:
|
||||
* - Pre-generating webhook URLs based on blockId (without creating webhook)
|
||||
* - Loading existing webhooks from the API
|
||||
* - Saving and deleting webhook configurations
|
||||
* Hook to load webhook info for trigger blocks.
|
||||
* Used for displaying webhook URLs in the UI.
|
||||
* Webhook creation/updates are handled by the deploy flow.
|
||||
*/
|
||||
export function useWebhookManagement({
|
||||
blockId,
|
||||
@@ -98,8 +91,6 @@ export function useWebhookManagement({
|
||||
const params = useParams()
|
||||
const workflowId = params.workflowId as string
|
||||
|
||||
const triggerDef = triggerId && isTriggerValid(triggerId) ? getTrigger(triggerId) : null
|
||||
|
||||
const webhookId = useSubBlockStore(
|
||||
useCallback((state) => state.getValue(blockId, 'webhookId') as string | null, [blockId])
|
||||
)
|
||||
@@ -107,7 +98,6 @@ export function useWebhookManagement({
|
||||
useCallback((state) => state.getValue(blockId, 'triggerPath') as string | null, [blockId])
|
||||
)
|
||||
const isLoading = useSubBlockStore((state) => state.loadingWebhooks.has(blockId))
|
||||
const isChecked = useSubBlockStore((state) => state.checkedWebhooks.has(blockId))
|
||||
|
||||
const webhookUrl = useMemo(() => {
|
||||
if (!webhookPath) {
|
||||
@@ -118,8 +108,6 @@ export function useWebhookManagement({
|
||||
return `${baseUrl}/api/webhooks/trigger/${webhookPath}`
|
||||
}, [webhookPath, blockId])
|
||||
|
||||
const [isSaving, setIsSaving] = useState(false)
|
||||
|
||||
useEffect(() => {
|
||||
if (triggerId && !isPreview) {
|
||||
const storedTriggerId = useSubBlockStore.getState().getValue(blockId, 'triggerId')
|
||||
@@ -143,7 +131,7 @@ export function useWebhookManagement({
|
||||
return
|
||||
}
|
||||
|
||||
const loadWebhookOrGenerateUrl = async () => {
|
||||
const loadWebhookInfo = async () => {
|
||||
useSubBlockStore.setState((state) => ({
|
||||
loadingWebhooks: new Set([...state.loadingWebhooks, blockId]),
|
||||
}))
|
||||
@@ -171,8 +159,6 @@ export function useWebhookManagement({
|
||||
if (webhook.providerConfig) {
|
||||
const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId, webhook)
|
||||
|
||||
// Filter out runtime/system fields from providerConfig before storing as triggerConfig
|
||||
// These fields are managed by the system and should not be included in change detection
|
||||
const {
|
||||
credentialId: _credId,
|
||||
credentialSetId: _credSetId,
|
||||
@@ -224,202 +210,14 @@ export function useWebhookManagement({
|
||||
}
|
||||
}
|
||||
if (useWebhookUrl) {
|
||||
loadWebhookOrGenerateUrl()
|
||||
loadWebhookInfo()
|
||||
}
|
||||
}, [isPreview, triggerId, workflowId, blockId, useWebhookUrl])
|
||||
|
||||
const createWebhook = async (
|
||||
effectiveTriggerId: string | undefined,
|
||||
selectedCredentialId: string | null
|
||||
): Promise<boolean> => {
|
||||
if (!triggerDef || !effectiveTriggerId) {
|
||||
return false
|
||||
}
|
||||
|
||||
const triggerConfig = useSubBlockStore.getState().getValue(blockId, 'triggerConfig')
|
||||
|
||||
const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX)
|
||||
const credentialSetId = isCredentialSet
|
||||
? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length)
|
||||
: undefined
|
||||
const credentialId = isCredentialSet ? undefined : selectedCredentialId
|
||||
|
||||
const webhookConfig = {
|
||||
...(triggerConfig || {}),
|
||||
...(credentialId ? { credentialId } : {}),
|
||||
...(credentialSetId ? { credentialSetId } : {}),
|
||||
triggerId: effectiveTriggerId,
|
||||
}
|
||||
|
||||
const path = blockId
|
||||
|
||||
const response = await fetch('/api/webhooks', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
workflowId,
|
||||
blockId,
|
||||
path,
|
||||
provider: triggerDef.provider,
|
||||
providerConfig: webhookConfig,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
let errorMessage = 'Failed to create webhook'
|
||||
try {
|
||||
const errorData = await response.json()
|
||||
errorMessage = errorData.details || errorData.error || errorMessage
|
||||
} catch {
|
||||
// If response is not JSON, use default message
|
||||
}
|
||||
logger.error('Failed to create webhook', { errorMessage })
|
||||
throw new Error(errorMessage)
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const savedWebhookId = data.webhook.id
|
||||
|
||||
useSubBlockStore.getState().setValue(blockId, 'triggerPath', path)
|
||||
useSubBlockStore.getState().setValue(blockId, 'triggerId', effectiveTriggerId)
|
||||
useSubBlockStore.getState().setValue(blockId, 'webhookId', savedWebhookId)
|
||||
useSubBlockStore.setState((state) => ({
|
||||
checkedWebhooks: new Set([...state.checkedWebhooks, blockId]),
|
||||
}))
|
||||
|
||||
logger.info('Trigger webhook created successfully', {
|
||||
webhookId: savedWebhookId,
|
||||
triggerId: effectiveTriggerId,
|
||||
provider: triggerDef.provider,
|
||||
blockId,
|
||||
})
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
const updateWebhook = async (
|
||||
webhookIdToUpdate: string,
|
||||
effectiveTriggerId: string | undefined,
|
||||
selectedCredentialId: string | null
|
||||
): Promise<boolean> => {
|
||||
const triggerConfigRaw = useSubBlockStore.getState().getValue(blockId, 'triggerConfig')
|
||||
const triggerConfig =
|
||||
typeof triggerConfigRaw === 'object' && triggerConfigRaw !== null
|
||||
? (triggerConfigRaw as Record<string, unknown>)
|
||||
: {}
|
||||
|
||||
const isCredentialSet = selectedCredentialId?.startsWith(CREDENTIAL_SET_PREFIX)
|
||||
const credentialSetId = isCredentialSet
|
||||
? selectedCredentialId!.slice(CREDENTIAL_SET_PREFIX.length)
|
||||
: undefined
|
||||
const credentialId = isCredentialSet ? undefined : selectedCredentialId
|
||||
|
||||
const response = await fetch(`/api/webhooks/${webhookIdToUpdate}`, {
|
||||
method: 'PATCH',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
providerConfig: {
|
||||
...triggerConfig,
|
||||
...(credentialId ? { credentialId } : {}),
|
||||
...(credentialSetId ? { credentialSetId } : {}),
|
||||
triggerId: effectiveTriggerId,
|
||||
},
|
||||
}),
|
||||
})
|
||||
|
||||
if (response.status === 404) {
|
||||
logger.warn('Webhook not found while updating, recreating', {
|
||||
blockId,
|
||||
lostWebhookId: webhookIdToUpdate,
|
||||
})
|
||||
useSubBlockStore.getState().setValue(blockId, 'webhookId', null)
|
||||
return createWebhook(effectiveTriggerId, selectedCredentialId)
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
let errorMessage = 'Failed to save trigger configuration'
|
||||
try {
|
||||
const errorData = await response.json()
|
||||
errorMessage = errorData.details || errorData.error || errorMessage
|
||||
} catch {
|
||||
// If response is not JSON, use default message
|
||||
}
|
||||
logger.error('Failed to save trigger config', { errorMessage })
|
||||
throw new Error(errorMessage)
|
||||
}
|
||||
|
||||
logger.info('Trigger config saved successfully', { blockId, webhookId: webhookIdToUpdate })
|
||||
return true
|
||||
}
|
||||
|
||||
const saveConfig = async (): Promise<boolean> => {
|
||||
if (isPreview || !triggerDef) {
|
||||
return false
|
||||
}
|
||||
|
||||
const effectiveTriggerId = resolveEffectiveTriggerId(blockId, triggerId)
|
||||
|
||||
try {
|
||||
setIsSaving(true)
|
||||
|
||||
const triggerCredentials = useSubBlockStore.getState().getValue(blockId, 'triggerCredentials')
|
||||
const selectedCredentialId = (triggerCredentials as string | null) || null
|
||||
|
||||
if (!webhookId) {
|
||||
return createWebhook(effectiveTriggerId, selectedCredentialId)
|
||||
}
|
||||
|
||||
return updateWebhook(webhookId, effectiveTriggerId, selectedCredentialId)
|
||||
} catch (error) {
|
||||
logger.error('Error saving trigger config:', error)
|
||||
throw error
|
||||
} finally {
|
||||
setIsSaving(false)
|
||||
}
|
||||
}
|
||||
|
||||
const deleteConfig = async (): Promise<boolean> => {
|
||||
if (isPreview || !webhookId) {
|
||||
return false
|
||||
}
|
||||
|
||||
try {
|
||||
setIsSaving(true)
|
||||
|
||||
const response = await fetch(`/api/webhooks/${webhookId}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
logger.error('Failed to delete webhook')
|
||||
return false
|
||||
}
|
||||
|
||||
useSubBlockStore.getState().setValue(blockId, 'triggerPath', '')
|
||||
useSubBlockStore.getState().setValue(blockId, 'webhookId', null)
|
||||
useSubBlockStore.setState((state) => {
|
||||
const newSet = new Set(state.checkedWebhooks)
|
||||
newSet.delete(blockId)
|
||||
return { checkedWebhooks: newSet }
|
||||
})
|
||||
|
||||
logger.info('Webhook deleted successfully')
|
||||
return true
|
||||
} catch (error) {
|
||||
logger.error('Error deleting webhook:', error)
|
||||
return false
|
||||
} finally {
|
||||
setIsSaving(false)
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
webhookUrl,
|
||||
webhookPath: webhookPath || blockId,
|
||||
webhookId,
|
||||
isLoading,
|
||||
isSaving,
|
||||
saveConfig,
|
||||
deleteConfig,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,12 @@ interface TriggerSaveError {
|
||||
interface TriggerSaveResult {
|
||||
success: boolean
|
||||
error?: TriggerSaveError
|
||||
warnings?: string[]
|
||||
}
|
||||
|
||||
interface CredentialSetSyncResult {
|
||||
error: TriggerSaveError | null
|
||||
warnings: string[]
|
||||
}
|
||||
|
||||
interface SaveTriggerWebhooksInput {
|
||||
@@ -42,6 +48,16 @@ interface SaveTriggerWebhooksInput {
|
||||
blocks: Record<string, BlockState>
|
||||
requestId: string
|
||||
deploymentVersionId?: string
|
||||
/**
|
||||
* The previous active version's ID. Only this version's external subscriptions
|
||||
* will be cleaned up (along with draft webhooks). If not provided, skips cleanup.
|
||||
*/
|
||||
previousVersionId?: string
|
||||
/**
|
||||
* When true, forces recreation of external subscriptions even if webhook config is unchanged.
|
||||
* Used when activating a previous deployment version whose subscriptions were cleaned up.
|
||||
*/
|
||||
forceRecreateSubscriptions?: boolean
|
||||
}
|
||||
|
||||
function getSubBlockValue(block: BlockState, subBlockId: string): unknown {
|
||||
@@ -248,7 +264,7 @@ async function syncCredentialSetWebhooks(params: {
|
||||
providerConfig: Record<string, unknown>
|
||||
requestId: string
|
||||
deploymentVersionId?: string
|
||||
}): Promise<TriggerSaveError | null> {
|
||||
}): Promise<CredentialSetSyncResult> {
|
||||
const {
|
||||
workflowId,
|
||||
blockId,
|
||||
@@ -261,7 +277,7 @@ async function syncCredentialSetWebhooks(params: {
|
||||
|
||||
const credentialSetId = providerConfig.credentialSetId as string | undefined
|
||||
if (!credentialSetId) {
|
||||
return null
|
||||
return { error: null, warnings: [] }
|
||||
}
|
||||
|
||||
const oauthProviderId = getProviderIdFromServiceId(provider)
|
||||
@@ -280,10 +296,23 @@ async function syncCredentialSetWebhooks(params: {
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
const warnings: string[] = []
|
||||
|
||||
if (syncResult.failed.length > 0) {
|
||||
const failedCount = syncResult.failed.length
|
||||
const totalCount = syncResult.webhooks.length + failedCount
|
||||
warnings.push(
|
||||
`${failedCount} of ${totalCount} credentials in the set failed to sync for ${provider}. Some team members may not receive triggers.`
|
||||
)
|
||||
}
|
||||
|
||||
if (syncResult.webhooks.length === 0) {
|
||||
return {
|
||||
message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`,
|
||||
status: 400,
|
||||
error: {
|
||||
message: `No valid credentials found in credential set for ${provider}. Please connect accounts and try again.`,
|
||||
status: 400,
|
||||
},
|
||||
warnings,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -297,8 +326,11 @@ async function syncCredentialSetWebhooks(params: {
|
||||
if (!success) {
|
||||
await db.delete(webhook).where(eq(webhook.id, wh.id))
|
||||
return {
|
||||
message: `Failed to configure ${provider} polling. Please check account permissions.`,
|
||||
status: 500,
|
||||
error: {
|
||||
message: `Failed to configure ${provider} polling. Please check account permissions.`,
|
||||
status: 500,
|
||||
},
|
||||
warnings,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -306,84 +338,7 @@ async function syncCredentialSetWebhooks(params: {
|
||||
}
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
async function createWebhookForBlock(params: {
|
||||
request: NextRequest
|
||||
workflowId: string
|
||||
workflow: Record<string, unknown>
|
||||
userId: string
|
||||
block: BlockState
|
||||
provider: string
|
||||
providerConfig: Record<string, unknown>
|
||||
triggerPath: string
|
||||
requestId: string
|
||||
deploymentVersionId?: string
|
||||
}): Promise<TriggerSaveError | null> {
|
||||
const {
|
||||
request,
|
||||
workflowId,
|
||||
workflow,
|
||||
userId,
|
||||
block,
|
||||
provider,
|
||||
providerConfig,
|
||||
triggerPath,
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
} = params
|
||||
|
||||
const webhookId = nanoid()
|
||||
const createPayload = {
|
||||
id: webhookId,
|
||||
path: triggerPath,
|
||||
provider,
|
||||
providerConfig,
|
||||
}
|
||||
|
||||
const result = await createExternalWebhookSubscription(
|
||||
request,
|
||||
createPayload,
|
||||
workflow,
|
||||
userId,
|
||||
requestId
|
||||
)
|
||||
|
||||
const updatedProviderConfig = result.updatedProviderConfig as Record<string, unknown>
|
||||
let savedWebhook: any
|
||||
|
||||
try {
|
||||
const createdRows = await db
|
||||
.insert(webhook)
|
||||
.values({
|
||||
id: webhookId,
|
||||
workflowId,
|
||||
deploymentVersionId: deploymentVersionId || null,
|
||||
blockId: block.id,
|
||||
path: triggerPath,
|
||||
provider,
|
||||
providerConfig: updatedProviderConfig,
|
||||
credentialSetId: (updatedProviderConfig.credentialSetId as string | undefined) || null,
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.returning()
|
||||
savedWebhook = createdRows[0]
|
||||
} catch (error) {
|
||||
if (result.externalSubscriptionCreated) {
|
||||
await cleanupExternalWebhook(createPayload, workflow, requestId)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
const pollingError = await configurePollingIfNeeded(provider, savedWebhook, requestId)
|
||||
if (pollingError) {
|
||||
return pollingError
|
||||
}
|
||||
|
||||
return null
|
||||
return { error: null, warnings }
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -398,23 +353,57 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
blocks,
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
previousVersionId,
|
||||
forceRecreateSubscriptions = false,
|
||||
}: SaveTriggerWebhooksInput): Promise<TriggerSaveResult> {
|
||||
const triggerBlocks = Object.values(blocks || {}).filter(Boolean)
|
||||
const triggerBlocks = Object.values(blocks || {}).filter((b) => b && b.enabled !== false)
|
||||
const currentBlockIds = new Set(triggerBlocks.map((b) => b.id))
|
||||
|
||||
// 1. Get all existing webhooks for this workflow
|
||||
const existingWebhooks = await db
|
||||
// 1. Get ALL webhooks for this workflow (all versions including draft)
|
||||
const allWorkflowWebhooks = await db
|
||||
.select()
|
||||
.from(webhook)
|
||||
.where(
|
||||
deploymentVersionId
|
||||
? and(
|
||||
eq(webhook.workflowId, workflowId),
|
||||
eq(webhook.deploymentVersionId, deploymentVersionId)
|
||||
)
|
||||
: eq(webhook.workflowId, workflowId)
|
||||
.where(eq(webhook.workflowId, workflowId))
|
||||
|
||||
// Separate webhooks by version: current deployment vs others
|
||||
const existingWebhooks: typeof allWorkflowWebhooks = []
|
||||
|
||||
for (const wh of allWorkflowWebhooks) {
|
||||
if (deploymentVersionId && wh.deploymentVersionId === deploymentVersionId) {
|
||||
existingWebhooks.push(wh)
|
||||
}
|
||||
}
|
||||
|
||||
if (previousVersionId) {
|
||||
const webhooksToCleanup = allWorkflowWebhooks.filter(
|
||||
(wh) => wh.deploymentVersionId === previousVersionId
|
||||
)
|
||||
|
||||
if (webhooksToCleanup.length > 0) {
|
||||
logger.info(
|
||||
`[${requestId}] Cleaning up ${webhooksToCleanup.length} external subscription(s) from previous version`
|
||||
)
|
||||
for (const wh of webhooksToCleanup) {
|
||||
try {
|
||||
await cleanupExternalWebhook(wh, workflow, requestId)
|
||||
} catch (cleanupError) {
|
||||
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const restorePreviousSubscriptions = async () => {
|
||||
if (!previousVersionId) return
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow,
|
||||
userId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
|
||||
const webhooksByBlockId = new Map<string, typeof existingWebhooks>()
|
||||
for (const wh of existingWebhooks) {
|
||||
if (!wh.blockId) continue
|
||||
@@ -429,7 +418,14 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
existingWebhookBlockIds: Array.from(webhooksByBlockId.keys()),
|
||||
})
|
||||
|
||||
// 2. Determine which webhooks to delete (orphaned or config changed)
|
||||
type WebhookConfig = {
|
||||
provider: string
|
||||
providerConfig: Record<string, unknown>
|
||||
triggerPath: string
|
||||
triggerDef: ReturnType<typeof getTrigger>
|
||||
}
|
||||
const webhookConfigs = new Map<string, WebhookConfig>()
|
||||
|
||||
const webhooksToDelete: typeof existingWebhooks = []
|
||||
const blocksNeedingWebhook: BlockState[] = []
|
||||
const blocksNeedingCredentialSetSync: BlockState[] = []
|
||||
@@ -447,6 +443,7 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
)
|
||||
|
||||
if (missingFields.length > 0) {
|
||||
await restorePreviousSubscriptions()
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
@@ -455,9 +452,8 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
},
|
||||
}
|
||||
}
|
||||
// Store config for later use
|
||||
|
||||
;(block as any)._webhookConfig = { provider, providerConfig, triggerPath, triggerDef }
|
||||
webhookConfigs.set(block.id, { provider, providerConfig, triggerPath, triggerDef })
|
||||
|
||||
if (providerConfig.credentialSetId) {
|
||||
blocksNeedingCredentialSetSync.push(block)
|
||||
@@ -477,22 +473,29 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
)
|
||||
}
|
||||
|
||||
// Check if config changed
|
||||
// Check if config changed or if we're forcing recreation (e.g., activating old version)
|
||||
const existingConfig = (existingWh.providerConfig as Record<string, unknown>) || {}
|
||||
if (
|
||||
const needsRecreation =
|
||||
forceRecreateSubscriptions ||
|
||||
shouldRecreateExternalWebhookSubscription({
|
||||
previousProvider: existingWh.provider as string,
|
||||
nextProvider: provider,
|
||||
previousConfig: existingConfig,
|
||||
nextConfig: providerConfig,
|
||||
})
|
||||
) {
|
||||
// Config changed - delete and recreate
|
||||
|
||||
if (needsRecreation) {
|
||||
webhooksToDelete.push(existingWh)
|
||||
blocksNeedingWebhook.push(block)
|
||||
logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`)
|
||||
if (forceRecreateSubscriptions) {
|
||||
logger.info(
|
||||
`[${requestId}] Forcing webhook recreation for block ${block.id} (reactivating version)`
|
||||
)
|
||||
} else {
|
||||
logger.info(`[${requestId}] Webhook config changed for block ${block.id}, will recreate`)
|
||||
}
|
||||
}
|
||||
// else: config unchanged, keep existing webhook
|
||||
// else: config unchanged and not forcing recreation, keep existing webhook
|
||||
}
|
||||
}
|
||||
|
||||
@@ -522,15 +525,16 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
await db.delete(webhook).where(inArray(webhook.id, idsToDelete))
|
||||
}
|
||||
|
||||
// 4. Sync credential set webhooks
|
||||
const collectedWarnings: string[] = []
|
||||
|
||||
for (const block of blocksNeedingCredentialSetSync) {
|
||||
const config = (block as any)._webhookConfig
|
||||
const config = webhookConfigs.get(block.id)
|
||||
if (!config) continue
|
||||
|
||||
const { provider, providerConfig, triggerPath } = config
|
||||
|
||||
try {
|
||||
const credentialSetError = await syncCredentialSetWebhooks({
|
||||
const syncResult = await syncCredentialSetWebhooks({
|
||||
workflowId,
|
||||
blockId: block.id,
|
||||
provider,
|
||||
@@ -540,74 +544,214 @@ export async function saveTriggerWebhooksForDeploy({
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
if (credentialSetError) {
|
||||
return { success: false, error: credentialSetError }
|
||||
if (syncResult.warnings.length > 0) {
|
||||
collectedWarnings.push(...syncResult.warnings)
|
||||
}
|
||||
|
||||
if (syncResult.error) {
|
||||
await restorePreviousSubscriptions()
|
||||
return { success: false, error: syncResult.error, warnings: collectedWarnings }
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
|
||||
await restorePreviousSubscriptions()
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message: error?.message || 'Failed to save trigger configuration',
|
||||
status: 500,
|
||||
},
|
||||
warnings: collectedWarnings,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Create webhooks for blocks that need them
|
||||
// 5. Create webhooks for blocks that need them (two-phase approach for atomicity)
|
||||
const createdSubscriptions: Array<{
|
||||
webhookId: string
|
||||
block: BlockState
|
||||
provider: string
|
||||
triggerPath: string
|
||||
updatedProviderConfig: Record<string, unknown>
|
||||
externalSubscriptionCreated: boolean
|
||||
}> = []
|
||||
|
||||
for (const block of blocksNeedingWebhook) {
|
||||
const config = (block as any)._webhookConfig
|
||||
const config = webhookConfigs.get(block.id)
|
||||
if (!config) continue
|
||||
|
||||
const { provider, providerConfig, triggerPath } = config
|
||||
const webhookId = nanoid()
|
||||
const createPayload = {
|
||||
id: webhookId,
|
||||
path: triggerPath,
|
||||
provider,
|
||||
providerConfig,
|
||||
}
|
||||
|
||||
try {
|
||||
const createError = await createWebhookForBlock({
|
||||
const result = await createExternalWebhookSubscription(
|
||||
request,
|
||||
workflowId,
|
||||
createPayload,
|
||||
workflow,
|
||||
userId,
|
||||
requestId
|
||||
)
|
||||
|
||||
createdSubscriptions.push({
|
||||
webhookId,
|
||||
block,
|
||||
provider,
|
||||
providerConfig,
|
||||
triggerPath,
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
updatedProviderConfig: result.updatedProviderConfig as Record<string, unknown>,
|
||||
externalSubscriptionCreated: result.externalSubscriptionCreated,
|
||||
})
|
||||
|
||||
if (createError) {
|
||||
return { success: false, error: createError }
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
|
||||
logger.error(`[${requestId}] Failed to create external subscription for ${block.id}`, error)
|
||||
for (const sub of createdSubscriptions) {
|
||||
if (sub.externalSubscriptionCreated) {
|
||||
try {
|
||||
await cleanupExternalWebhook(
|
||||
{
|
||||
id: sub.webhookId,
|
||||
path: sub.triggerPath,
|
||||
provider: sub.provider,
|
||||
providerConfig: sub.updatedProviderConfig,
|
||||
},
|
||||
workflow,
|
||||
requestId
|
||||
)
|
||||
} catch (cleanupError) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
await restorePreviousSubscriptions()
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message: error?.message || 'Failed to save trigger configuration',
|
||||
message: error?.message || 'Failed to create external subscription',
|
||||
status: 500,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up temp config
|
||||
for (const block of triggerBlocks) {
|
||||
;(block as any)._webhookConfig = undefined
|
||||
// Phase 2: Insert all DB records in a transaction
|
||||
try {
|
||||
await db.transaction(async (tx) => {
|
||||
for (const sub of createdSubscriptions) {
|
||||
await tx.insert(webhook).values({
|
||||
id: sub.webhookId,
|
||||
workflowId,
|
||||
deploymentVersionId: deploymentVersionId || null,
|
||||
blockId: sub.block.id,
|
||||
path: sub.triggerPath,
|
||||
provider: sub.provider,
|
||||
providerConfig: sub.updatedProviderConfig,
|
||||
credentialSetId:
|
||||
(sub.updatedProviderConfig.credentialSetId as string | undefined) || null,
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
for (const sub of createdSubscriptions) {
|
||||
const pollingError = await configurePollingIfNeeded(
|
||||
sub.provider,
|
||||
{ id: sub.webhookId, path: sub.triggerPath, providerConfig: sub.updatedProviderConfig },
|
||||
requestId
|
||||
)
|
||||
if (pollingError) {
|
||||
logger.error(
|
||||
`[${requestId}] Polling configuration failed for ${sub.block.id}`,
|
||||
pollingError
|
||||
)
|
||||
for (const otherSub of createdSubscriptions) {
|
||||
if (otherSub.webhookId === sub.webhookId) continue
|
||||
if (otherSub.externalSubscriptionCreated) {
|
||||
try {
|
||||
await cleanupExternalWebhook(
|
||||
{
|
||||
id: otherSub.webhookId,
|
||||
path: otherSub.triggerPath,
|
||||
provider: otherSub.provider,
|
||||
providerConfig: otherSub.updatedProviderConfig,
|
||||
},
|
||||
workflow,
|
||||
requestId
|
||||
)
|
||||
} catch (cleanupError) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to cleanup external subscription for ${otherSub.block.id}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
const otherWebhookIds = createdSubscriptions
|
||||
.filter((s) => s.webhookId !== sub.webhookId)
|
||||
.map((s) => s.webhookId)
|
||||
if (otherWebhookIds.length > 0) {
|
||||
await db.delete(webhook).where(inArray(webhook.id, otherWebhookIds))
|
||||
}
|
||||
await restorePreviousSubscriptions()
|
||||
return { success: false, error: pollingError }
|
||||
}
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Failed to insert webhook records`, error)
|
||||
for (const sub of createdSubscriptions) {
|
||||
if (sub.externalSubscriptionCreated) {
|
||||
try {
|
||||
await cleanupExternalWebhook(
|
||||
{
|
||||
id: sub.webhookId,
|
||||
path: sub.triggerPath,
|
||||
provider: sub.provider,
|
||||
providerConfig: sub.updatedProviderConfig,
|
||||
},
|
||||
workflow,
|
||||
requestId
|
||||
)
|
||||
} catch (cleanupError) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to cleanup external subscription for ${sub.block.id}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
await restorePreviousSubscriptions()
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message: error?.message || 'Failed to save webhook records',
|
||||
status: 500,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return { success: true }
|
||||
return { success: true, warnings: collectedWarnings.length > 0 ? collectedWarnings : undefined }
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up all webhooks for a workflow during undeploy.
|
||||
* Removes external subscriptions and deletes webhook records from the database.
|
||||
*
|
||||
* @param skipExternalCleanup - If true, skip external subscription cleanup (already done elsewhere)
|
||||
*/
|
||||
export async function cleanupWebhooksForWorkflow(
|
||||
workflowId: string,
|
||||
workflow: Record<string, unknown>,
|
||||
requestId: string,
|
||||
deploymentVersionId?: string
|
||||
deploymentVersionId?: string,
|
||||
skipExternalCleanup = false
|
||||
): Promise<void> {
|
||||
const existingWebhooks = await db
|
||||
.select()
|
||||
@@ -626,23 +770,26 @@ export async function cleanupWebhooksForWorkflow(
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for undeploy`, {
|
||||
workflowId,
|
||||
deploymentVersionId,
|
||||
webhookIds: existingWebhooks.map((wh) => wh.id),
|
||||
})
|
||||
logger.info(
|
||||
`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for ${skipExternalCleanup ? 'DB records only' : 'undeploy'}`,
|
||||
{
|
||||
workflowId,
|
||||
deploymentVersionId,
|
||||
webhookIds: existingWebhooks.map((wh) => wh.id),
|
||||
}
|
||||
)
|
||||
|
||||
// Clean up external subscriptions
|
||||
for (const wh of existingWebhooks) {
|
||||
try {
|
||||
await cleanupExternalWebhook(wh, workflow, requestId)
|
||||
} catch (cleanupError) {
|
||||
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
|
||||
// Continue with other webhooks even if one fails
|
||||
if (!skipExternalCleanup) {
|
||||
for (const wh of existingWebhooks) {
|
||||
try {
|
||||
await cleanupExternalWebhook(wh, workflow, requestId)
|
||||
} catch (cleanupError) {
|
||||
logger.warn(`[${requestId}] Failed to cleanup external webhook ${wh.id}`, cleanupError)
|
||||
// Continue with other webhooks even if one fails
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all webhook records
|
||||
await db
|
||||
.delete(webhook)
|
||||
.where(
|
||||
@@ -660,3 +807,55 @@ export async function cleanupWebhooksForWorkflow(
|
||||
: `[${requestId}] Cleaned up all webhooks for workflow ${workflowId}`
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore external subscriptions for a previous deployment version.
|
||||
* Used when activation/deployment fails after webhooks were created,
|
||||
* to restore the previous version's external subscriptions.
|
||||
*/
|
||||
export async function restorePreviousVersionWebhooks(params: {
|
||||
request: NextRequest
|
||||
workflow: Record<string, unknown>
|
||||
userId: string
|
||||
previousVersionId: string
|
||||
requestId: string
|
||||
}): Promise<void> {
|
||||
const { request, workflow, userId, previousVersionId, requestId } = params
|
||||
|
||||
const previousWebhooks = await db
|
||||
.select()
|
||||
.from(webhook)
|
||||
.where(eq(webhook.deploymentVersionId, previousVersionId))
|
||||
|
||||
if (previousWebhooks.length === 0) {
|
||||
logger.debug(`[${requestId}] No previous webhooks to restore for version ${previousVersionId}`)
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Restoring ${previousWebhooks.length} external subscription(s) for previous version ${previousVersionId}`
|
||||
)
|
||||
|
||||
for (const wh of previousWebhooks) {
|
||||
try {
|
||||
await createExternalWebhookSubscription(
|
||||
request,
|
||||
{
|
||||
id: wh.id,
|
||||
path: wh.path,
|
||||
provider: wh.provider,
|
||||
providerConfig: (wh.providerConfig as Record<string, unknown>) || {},
|
||||
},
|
||||
workflow,
|
||||
userId,
|
||||
requestId
|
||||
)
|
||||
logger.info(`[${requestId}] Restored external subscription for webhook ${wh.id}`)
|
||||
} catch (restoreError) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to restore external subscription for webhook ${wh.id}`,
|
||||
restoreError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,7 +161,7 @@ export async function pollGmailWebhooks() {
|
||||
const metadata = webhookData.providerConfig as any
|
||||
const credentialId: string | undefined = metadata?.credentialId
|
||||
const userId: string | undefined = metadata?.userId
|
||||
const credentialSetId: string | undefined = metadata?.credentialSetId
|
||||
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
|
||||
|
||||
if (!credentialId && !userId) {
|
||||
logger.error(`[${requestId}] Missing credential info for webhook ${webhookId}`)
|
||||
@@ -697,7 +697,6 @@ async function processEmails(
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Webhook-Secret': webhookData.secret || '',
|
||||
'User-Agent': 'Sim/1.0',
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
@@ -766,17 +765,21 @@ async function markEmailAsRead(accessToken: string, messageId: string) {
|
||||
}
|
||||
|
||||
async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) {
|
||||
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
|
||||
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
providerConfig: {
|
||||
...existingConfig,
|
||||
lastCheckedTimestamp: timestamp,
|
||||
...(historyId ? { historyId } : {}),
|
||||
} as any,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
try {
|
||||
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
|
||||
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
providerConfig: {
|
||||
...existingConfig,
|
||||
lastCheckedTimestamp: timestamp,
|
||||
...(historyId ? { historyId } : {}),
|
||||
} as any,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
} catch (error) {
|
||||
logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -645,7 +645,6 @@ async function processEmails(
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Webhook-Secret': '',
|
||||
'User-Agent': 'Sim/1.0',
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
|
||||
@@ -210,7 +210,7 @@ export async function pollOutlookWebhooks() {
|
||||
const metadata = webhookData.providerConfig as any
|
||||
const credentialId: string | undefined = metadata?.credentialId
|
||||
const userId: string | undefined = metadata?.userId
|
||||
const credentialSetId: string | undefined = metadata?.credentialSetId
|
||||
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
|
||||
|
||||
if (!credentialId && !userId) {
|
||||
logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`)
|
||||
@@ -607,7 +607,6 @@ async function processOutlookEmails(
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Webhook-Secret': webhookData.secret || '',
|
||||
'User-Agent': 'Sim/1.0',
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
|
||||
@@ -7,16 +7,27 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
|
||||
import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
|
||||
import {
|
||||
handleSlackChallenge,
|
||||
handleWhatsAppVerification,
|
||||
validateCirclebackSignature,
|
||||
validateFirefliesSignature,
|
||||
validateGitHubSignature,
|
||||
validateJiraSignature,
|
||||
validateLinearSignature,
|
||||
validateMicrosoftTeamsSignature,
|
||||
validateTwilioSignature,
|
||||
validateTypeformSignature,
|
||||
verifyProviderWebhook,
|
||||
} from '@/lib/webhooks/utils.server'
|
||||
import { executeWebhookJob } from '@/background/webhook-execution'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
import { isGitHubEventMatch } from '@/triggers/github/utils'
|
||||
import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
|
||||
import { isJiraEventMatch } from '@/triggers/jira/utils'
|
||||
|
||||
const logger = createLogger('WebhookProcessor')
|
||||
|
||||
@@ -451,7 +462,6 @@ export async function verifyProviderAuth(
|
||||
// Step 1: Fetch and decrypt environment variables for signature verification
|
||||
let decryptedEnvVars: Record<string, string> = {}
|
||||
try {
|
||||
const { getEffectiveDecryptedEnv } = await import('@/lib/environment/utils')
|
||||
decryptedEnvVars = await getEffectiveDecryptedEnv(
|
||||
foundWorkflow.userId,
|
||||
foundWorkflow.workspaceId
|
||||
@@ -553,9 +563,6 @@ export async function verifyProviderAuth(
|
||||
}
|
||||
|
||||
const fullUrl = getExternalUrl(request)
|
||||
|
||||
const { validateTwilioSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = await validateTwilioSignature(authToken, signature, fullUrl, params)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -583,8 +590,6 @@ export async function verifyProviderAuth(
|
||||
return new NextResponse('Unauthorized - Missing Typeform signature', { status: 401 })
|
||||
}
|
||||
|
||||
const { validateTypeformSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = validateTypeformSignature(secret, signature, rawBody)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -610,8 +615,6 @@ export async function verifyProviderAuth(
|
||||
return new NextResponse('Unauthorized - Missing Linear signature', { status: 401 })
|
||||
}
|
||||
|
||||
const { validateLinearSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = validateLinearSignature(secret, signature, rawBody)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -637,8 +640,6 @@ export async function verifyProviderAuth(
|
||||
return new NextResponse('Unauthorized - Missing Circleback signature', { status: 401 })
|
||||
}
|
||||
|
||||
const { validateCirclebackSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = validateCirclebackSignature(secret, signature, rawBody)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -664,8 +665,6 @@ export async function verifyProviderAuth(
|
||||
return new NextResponse('Unauthorized - Missing Jira signature', { status: 401 })
|
||||
}
|
||||
|
||||
const { validateJiraSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = validateJiraSignature(secret, signature, rawBody)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -694,8 +693,6 @@ export async function verifyProviderAuth(
|
||||
return new NextResponse('Unauthorized - Missing GitHub signature', { status: 401 })
|
||||
}
|
||||
|
||||
const { validateGitHubSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = validateGitHubSignature(secret, signature, rawBody)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -724,8 +721,6 @@ export async function verifyProviderAuth(
|
||||
return new NextResponse('Unauthorized - Missing Fireflies signature', { status: 401 })
|
||||
}
|
||||
|
||||
const { validateFirefliesSignature } = await import('@/lib/webhooks/utils.server')
|
||||
|
||||
const isValidSignature = validateFirefliesSignature(secret, signature, rawBody)
|
||||
|
||||
if (!isValidSignature) {
|
||||
@@ -860,8 +855,6 @@ export async function queueWebhookExecution(
|
||||
const eventType = request.headers.get('x-github-event')
|
||||
const action = body.action
|
||||
|
||||
const { isGitHubEventMatch } = await import('@/triggers/github/utils')
|
||||
|
||||
if (!isGitHubEventMatch(triggerId, eventType || '', action, body)) {
|
||||
logger.debug(
|
||||
`[${options.requestId}] GitHub event mismatch for trigger ${triggerId}. Event: ${eventType}, Action: ${action}. Skipping execution.`,
|
||||
@@ -890,8 +883,6 @@ export async function queueWebhookExecution(
|
||||
if (triggerId && triggerId !== 'jira_webhook') {
|
||||
const webhookEvent = body.webhookEvent as string | undefined
|
||||
|
||||
const { isJiraEventMatch } = await import('@/triggers/jira/utils')
|
||||
|
||||
if (!isJiraEventMatch(triggerId, webhookEvent || '', body)) {
|
||||
logger.debug(
|
||||
`[${options.requestId}] Jira event mismatch for trigger ${triggerId}. Event: ${webhookEvent}. Skipping execution.`,
|
||||
@@ -921,8 +912,6 @@ export async function queueWebhookExecution(
|
||||
|
||||
const subscriptionType = firstEvent?.subscriptionType as string | undefined
|
||||
|
||||
const { isHubSpotContactEventMatch } = await import('@/triggers/hubspot/utils')
|
||||
|
||||
if (!isHubSpotContactEventMatch(triggerId, subscriptionType || '')) {
|
||||
logger.debug(
|
||||
`[${options.requestId}] HubSpot event mismatch for trigger ${triggerId}. Event: ${subscriptionType}. Skipping execution.`,
|
||||
@@ -974,7 +963,8 @@ export async function queueWebhookExecution(
|
||||
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
|
||||
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
|
||||
const credentialId = providerConfig.credentialId as string | undefined
|
||||
const credentialSetId = providerConfig.credentialSetId as string | undefined
|
||||
// credentialSetId is a direct field on webhook table, not in providerConfig
|
||||
const credentialSetId = foundWebhook.credentialSetId as string | undefined
|
||||
|
||||
// Verify billing for credential sets
|
||||
if (credentialSetId) {
|
||||
|
||||
@@ -30,11 +30,11 @@ export async function createTeamsSubscription(
|
||||
webhook: any,
|
||||
workflow: any,
|
||||
requestId: string
|
||||
): Promise<void> {
|
||||
): Promise<string | undefined> {
|
||||
const config = getProviderConfig(webhook)
|
||||
|
||||
if (config.triggerId !== 'microsoftteams_chat_subscription') {
|
||||
return
|
||||
return undefined
|
||||
}
|
||||
|
||||
const credentialId = config.credentialId as string | undefined
|
||||
@@ -77,7 +77,7 @@ export async function createTeamsSubscription(
|
||||
teamsLogger.info(
|
||||
`[${requestId}] Teams subscription ${existingSubscriptionId} already exists for webhook ${webhook.id}`
|
||||
)
|
||||
return
|
||||
return existingSubscriptionId
|
||||
}
|
||||
} catch {
|
||||
teamsLogger.debug(`[${requestId}] Existing subscription check failed, will create new one`)
|
||||
@@ -140,6 +140,7 @@ export async function createTeamsSubscription(
|
||||
teamsLogger.info(
|
||||
`[${requestId}] Successfully created Teams subscription ${payload.id} for webhook ${webhook.id}`
|
||||
)
|
||||
return payload.id as string
|
||||
} catch (error: any) {
|
||||
if (
|
||||
error instanceof Error &&
|
||||
@@ -1600,9 +1601,11 @@ export async function createExternalWebhookSubscription(
|
||||
externalSubscriptionCreated = true
|
||||
}
|
||||
} else if (provider === 'microsoft-teams') {
|
||||
await createTeamsSubscription(request, webhookData, workflow, requestId)
|
||||
externalSubscriptionCreated =
|
||||
(providerConfig.triggerId as string | undefined) === 'microsoftteams_chat_subscription'
|
||||
const subscriptionId = await createTeamsSubscription(request, webhookData, workflow, requestId)
|
||||
if (subscriptionId) {
|
||||
updatedProviderConfig = { ...updatedProviderConfig, externalSubscriptionId: subscriptionId }
|
||||
externalSubscriptionCreated = true
|
||||
}
|
||||
} else if (provider === 'telegram') {
|
||||
await createTelegramWebhook(request, webhookData, requestId)
|
||||
externalSubscriptionCreated = true
|
||||
|
||||
@@ -379,7 +379,6 @@ async function processRssItems(
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Webhook-Secret': webhookData.secret || '',
|
||||
'User-Agent': 'Sim/1.0',
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
|
||||
@@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion } from '@sim/db'
|
||||
import { account, webhook } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, isNull, or } from 'drizzle-orm'
|
||||
import { nanoid } from 'nanoid'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import {
|
||||
type SecureFetchResponse,
|
||||
@@ -9,7 +10,11 @@ import {
|
||||
validateUrlWithDNS,
|
||||
} from '@/lib/core/security/input-validation'
|
||||
import type { DbOrTx } from '@/lib/db/types'
|
||||
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
import { getProviderIdFromServiceId } from '@/lib/oauth'
|
||||
import {
|
||||
getCredentialsForCredentialSet,
|
||||
refreshAccessTokenIfNeeded,
|
||||
} from '@/app/api/auth/oauth/utils'
|
||||
|
||||
const logger = createLogger('WebhookUtils')
|
||||
|
||||
@@ -1388,21 +1393,6 @@ export function verifyProviderWebhook(
|
||||
case 'stripe':
|
||||
break
|
||||
case 'gmail':
|
||||
if (providerConfig.secret) {
|
||||
const secretHeader = request.headers.get('X-Webhook-Secret')
|
||||
if (!secretHeader || secretHeader.length !== providerConfig.secret.length) {
|
||||
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
|
||||
return new NextResponse('Unauthorized', { status: 401 })
|
||||
}
|
||||
let result = 0
|
||||
for (let i = 0; i < secretHeader.length; i++) {
|
||||
result |= secretHeader.charCodeAt(i) ^ providerConfig.secret.charCodeAt(i)
|
||||
}
|
||||
if (result !== 0) {
|
||||
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
|
||||
return new NextResponse('Unauthorized', { status: 401 })
|
||||
}
|
||||
}
|
||||
break
|
||||
case 'telegram': {
|
||||
// Check User-Agent to ensure it's not blocked by middleware
|
||||
@@ -1946,6 +1936,10 @@ export interface CredentialSetWebhookSyncResult {
|
||||
created: number
|
||||
updated: number
|
||||
deleted: number
|
||||
failed: Array<{
|
||||
credentialId: string
|
||||
error: string
|
||||
}>
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1997,9 +1991,6 @@ export async function syncWebhooksForCredentialSet(params: {
|
||||
`[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}`
|
||||
)
|
||||
|
||||
const { getCredentialsForCredentialSet } = await import('@/app/api/auth/oauth/utils')
|
||||
const { nanoid } = await import('nanoid')
|
||||
|
||||
// Polling providers get unique paths per credential (for independent state)
|
||||
// External webhook providers share the same path (external service sends to one URL)
|
||||
const pollingProviders = ['gmail', 'outlook', 'rss', 'imap']
|
||||
@@ -2011,7 +2002,7 @@ export async function syncWebhooksForCredentialSet(params: {
|
||||
syncLogger.warn(
|
||||
`[${requestId}] No credentials found in credential set ${credentialSetId} for provider ${oauthProviderId}`
|
||||
)
|
||||
return { webhooks: [], created: 0, updated: 0, deleted: 0 }
|
||||
return { webhooks: [], created: 0, updated: 0, deleted: 0, failed: [] }
|
||||
}
|
||||
|
||||
syncLogger.info(
|
||||
@@ -2033,10 +2024,9 @@ export async function syncWebhooksForCredentialSet(params: {
|
||||
)
|
||||
|
||||
// Filter to only webhooks belonging to this credential set
|
||||
const credentialSetWebhooks = existingWebhooks.filter((wh) => {
|
||||
const config = wh.providerConfig as Record<string, any>
|
||||
return config?.credentialSetId === credentialSetId
|
||||
})
|
||||
const credentialSetWebhooks = existingWebhooks.filter(
|
||||
(wh) => wh.credentialSetId === credentialSetId
|
||||
)
|
||||
|
||||
syncLogger.info(
|
||||
`[${requestId}] Found ${credentialSetWebhooks.length} existing webhooks for credential set`
|
||||
@@ -2058,103 +2048,128 @@ export async function syncWebhooksForCredentialSet(params: {
|
||||
created: 0,
|
||||
updated: 0,
|
||||
deleted: 0,
|
||||
failed: [],
|
||||
}
|
||||
|
||||
// Process each credential in the set
|
||||
for (const cred of credentials) {
|
||||
const existingWebhook = existingByCredentialId.get(cred.credentialId)
|
||||
try {
|
||||
const existingWebhook = existingByCredentialId.get(cred.credentialId)
|
||||
|
||||
if (existingWebhook) {
|
||||
// Update existing webhook - preserve state fields
|
||||
const existingConfig = existingWebhook.providerConfig as Record<string, any>
|
||||
if (existingWebhook) {
|
||||
// Update existing webhook - preserve state fields
|
||||
const existingConfig = existingWebhook.providerConfig as Record<string, any>
|
||||
|
||||
const updatedConfig = {
|
||||
...providerConfig,
|
||||
basePath, // Store basePath for reliable reconstruction during membership sync
|
||||
credentialId: cred.credentialId,
|
||||
credentialSetId: credentialSetId,
|
||||
// Preserve state fields from existing config
|
||||
historyId: existingConfig.historyId,
|
||||
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
|
||||
setupCompleted: existingConfig.setupCompleted,
|
||||
externalId: existingConfig.externalId,
|
||||
userId: cred.userId,
|
||||
}
|
||||
const updatedConfig = {
|
||||
...providerConfig,
|
||||
basePath, // Store basePath for reliable reconstruction during membership sync
|
||||
credentialId: cred.credentialId,
|
||||
credentialSetId: credentialSetId,
|
||||
// Preserve state fields from existing config
|
||||
historyId: existingConfig?.historyId,
|
||||
lastCheckedTimestamp: existingConfig?.lastCheckedTimestamp,
|
||||
setupCompleted: existingConfig?.setupCompleted,
|
||||
externalId: existingConfig?.externalId,
|
||||
userId: cred.userId,
|
||||
}
|
||||
|
||||
await dbCtx
|
||||
.update(webhook)
|
||||
.set({
|
||||
...(deploymentVersionId ? { deploymentVersionId } : {}),
|
||||
providerConfig: updatedConfig,
|
||||
await dbCtx
|
||||
.update(webhook)
|
||||
.set({
|
||||
...(deploymentVersionId ? { deploymentVersionId } : {}),
|
||||
providerConfig: updatedConfig,
|
||||
isActive: true,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, existingWebhook.id))
|
||||
|
||||
result.webhooks.push({
|
||||
id: existingWebhook.id,
|
||||
credentialId: cred.credentialId,
|
||||
isNew: false,
|
||||
})
|
||||
result.updated++
|
||||
|
||||
syncLogger.debug(
|
||||
`[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}`
|
||||
)
|
||||
} else {
|
||||
// Create new webhook for this credential
|
||||
const webhookId = nanoid()
|
||||
const webhookPath = useUniquePaths
|
||||
? `${basePath}-${cred.credentialId.slice(0, 8)}`
|
||||
: basePath
|
||||
|
||||
const newConfig = {
|
||||
...providerConfig,
|
||||
basePath, // Store basePath for reliable reconstruction during membership sync
|
||||
credentialId: cred.credentialId,
|
||||
credentialSetId: credentialSetId,
|
||||
userId: cred.userId,
|
||||
}
|
||||
|
||||
await dbCtx.insert(webhook).values({
|
||||
id: webhookId,
|
||||
workflowId,
|
||||
blockId,
|
||||
path: webhookPath,
|
||||
provider,
|
||||
providerConfig: newConfig,
|
||||
credentialSetId, // Indexed column for efficient credential set queries
|
||||
isActive: true,
|
||||
...(deploymentVersionId ? { deploymentVersionId } : {}),
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, existingWebhook.id))
|
||||
|
||||
result.webhooks.push({
|
||||
id: existingWebhook.id,
|
||||
credentialId: cred.credentialId,
|
||||
isNew: false,
|
||||
})
|
||||
result.updated++
|
||||
result.webhooks.push({
|
||||
id: webhookId,
|
||||
credentialId: cred.credentialId,
|
||||
isNew: true,
|
||||
})
|
||||
result.created++
|
||||
|
||||
syncLogger.debug(
|
||||
`[${requestId}] Updated webhook ${existingWebhook.id} for credential ${cred.credentialId}`
|
||||
)
|
||||
} else {
|
||||
// Create new webhook for this credential
|
||||
const webhookId = nanoid()
|
||||
const webhookPath = useUniquePaths ? `${basePath}-${cred.credentialId.slice(0, 8)}` : basePath
|
||||
|
||||
const newConfig = {
|
||||
...providerConfig,
|
||||
basePath, // Store basePath for reliable reconstruction during membership sync
|
||||
credentialId: cred.credentialId,
|
||||
credentialSetId: credentialSetId,
|
||||
userId: cred.userId,
|
||||
syncLogger.debug(
|
||||
`[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}`
|
||||
)
|
||||
}
|
||||
|
||||
await dbCtx.insert(webhook).values({
|
||||
id: webhookId,
|
||||
workflowId,
|
||||
blockId,
|
||||
path: webhookPath,
|
||||
provider,
|
||||
providerConfig: newConfig,
|
||||
credentialSetId, // Indexed column for efficient credential set queries
|
||||
isActive: true,
|
||||
...(deploymentVersionId ? { deploymentVersionId } : {}),
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
|
||||
result.webhooks.push({
|
||||
id: webhookId,
|
||||
credentialId: cred.credentialId,
|
||||
isNew: true,
|
||||
})
|
||||
result.created++
|
||||
|
||||
syncLogger.debug(
|
||||
`[${requestId}] Created webhook ${webhookId} for credential ${cred.credentialId}`
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
syncLogger.error(
|
||||
`[${requestId}] Failed to sync webhook for credential ${cred.credentialId}: ${errorMessage}`
|
||||
)
|
||||
result.failed.push({
|
||||
credentialId: cred.credentialId,
|
||||
error: errorMessage,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Delete webhooks for credentials no longer in the set
|
||||
for (const [credentialId, existingWebhook] of existingByCredentialId) {
|
||||
if (!credentialIdsInSet.has(credentialId)) {
|
||||
await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id))
|
||||
result.deleted++
|
||||
try {
|
||||
await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id))
|
||||
result.deleted++
|
||||
|
||||
syncLogger.debug(
|
||||
`[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}`
|
||||
)
|
||||
syncLogger.debug(
|
||||
`[${requestId}] Deleted webhook ${existingWebhook.id} for removed credential ${credentialId}`
|
||||
)
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
syncLogger.error(
|
||||
`[${requestId}] Failed to delete webhook ${existingWebhook.id} for credential ${credentialId}: ${errorMessage}`
|
||||
)
|
||||
result.failed.push({
|
||||
credentialId,
|
||||
error: `Failed to delete: ${errorMessage}`,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
syncLogger.info(
|
||||
`[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted`
|
||||
`[${requestId}] Credential set webhook sync complete: ${result.created} created, ${result.updated} updated, ${result.deleted} deleted, ${result.failed.length} failed`
|
||||
)
|
||||
|
||||
return result
|
||||
@@ -2175,8 +2190,6 @@ export async function syncAllWebhooksForCredentialSet(
|
||||
const syncLogger = createLogger('CredentialSetMembershipSync')
|
||||
syncLogger.info(`[${requestId}] Syncing all webhooks for credential set ${credentialSetId}`)
|
||||
|
||||
const { getProviderIdFromServiceId } = await import('@/lib/oauth')
|
||||
|
||||
// Find all webhooks that use this credential set using the indexed column
|
||||
const webhooksForSet = await dbCtx
|
||||
.select({ webhook })
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Pure utility functions for TwiML processing
|
||||
* Pure utility functions for webhook processing
|
||||
* This file has NO server-side dependencies to ensure it can be safely imported in client-side code
|
||||
*/
|
||||
|
||||
@@ -18,3 +18,19 @@ export function convertSquareBracketsToTwiML(twiml: string | undefined): string
|
||||
// Replace [Tag] with <Tag> and [/Tag] with </Tag>
|
||||
return twiml.replace(/\[(\/?[^\]]+)\]/g, '<$1>')
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges fields from source into target, but only if they don't exist in the base config.
|
||||
* Used to preserve system-managed fields while respecting user-provided values.
|
||||
*/
|
||||
export function mergeNonUserFields(
|
||||
target: Record<string, unknown>,
|
||||
source: Record<string, unknown>,
|
||||
userProvided: Record<string, unknown>
|
||||
): void {
|
||||
for (const [key, value] of Object.entries(source)) {
|
||||
if (!(key in userProvided)) {
|
||||
target[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import type {
|
||||
IterationContext,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
|
||||
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
|
||||
} catch (error: unknown) {
|
||||
logger.error(`[${requestId}] Execution failed:`, error)
|
||||
|
||||
const errorWithResult = error as {
|
||||
executionResult?: ExecutionResult
|
||||
message?: string
|
||||
stack?: string
|
||||
}
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
endedAt: new Date().toISOString(),
|
||||
totalDurationMs: executionResult?.metadata?.duration || 0,
|
||||
error: {
|
||||
message: errorWithResult?.message || 'Execution failed',
|
||||
stackTrace: errorWithResult?.stack,
|
||||
message: error instanceof Error ? error.message : 'Execution failed',
|
||||
stackTrace: error instanceof Error ? error.stack : undefined,
|
||||
},
|
||||
traceSpans,
|
||||
})
|
||||
|
||||
@@ -17,6 +17,9 @@ const {
|
||||
mockValidateCronExpression,
|
||||
mockGetScheduleTimeValues,
|
||||
mockRandomUUID,
|
||||
mockTransaction,
|
||||
mockSelect,
|
||||
mockFrom,
|
||||
} = vi.hoisted(() => ({
|
||||
mockInsert: vi.fn(),
|
||||
mockDelete: vi.fn(),
|
||||
@@ -28,20 +31,27 @@ const {
|
||||
mockValidateCronExpression: vi.fn(),
|
||||
mockGetScheduleTimeValues: vi.fn(),
|
||||
mockRandomUUID: vi.fn(),
|
||||
mockTransaction: vi.fn(),
|
||||
mockSelect: vi.fn(),
|
||||
mockFrom: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@sim/db', () => ({
|
||||
db: {},
|
||||
db: {
|
||||
transaction: mockTransaction,
|
||||
},
|
||||
workflowSchedule: {
|
||||
workflowId: 'workflow_id',
|
||||
blockId: 'block_id',
|
||||
deploymentVersionId: 'deployment_version_id',
|
||||
id: 'id',
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('drizzle-orm', () => ({
|
||||
eq: vi.fn((...args) => ({ type: 'eq', args })),
|
||||
and: vi.fn((...args) => ({ type: 'and', args })),
|
||||
inArray: vi.fn((...args) => ({ type: 'inArray', args })),
|
||||
sql: vi.fn((strings, ...values) => ({ type: 'sql', strings, values })),
|
||||
}))
|
||||
|
||||
@@ -100,6 +110,20 @@ describe('Schedule Deploy Utilities', () => {
|
||||
// Setup mock chain for delete
|
||||
mockWhere.mockResolvedValue({})
|
||||
mockDelete.mockReturnValue({ where: mockWhere })
|
||||
|
||||
// Setup mock chain for select
|
||||
mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue([]) })
|
||||
mockSelect.mockReturnValue({ from: mockFrom })
|
||||
|
||||
// Setup transaction mock to execute callback with mock tx
|
||||
mockTransaction.mockImplementation(async (callback) => {
|
||||
const mockTx = {
|
||||
insert: mockInsert,
|
||||
delete: mockDelete,
|
||||
select: mockSelect,
|
||||
}
|
||||
return callback(mockTx)
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
@@ -135,6 +159,19 @@ describe('Schedule Deploy Utilities', () => {
|
||||
const result = findScheduleBlocks({})
|
||||
expect(result).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('should exclude disabled schedule blocks', () => {
|
||||
const blocks: Record<string, BlockState> = {
|
||||
'block-1': { id: 'block-1', type: 'schedule', enabled: true, subBlocks: {} } as BlockState,
|
||||
'block-2': { id: 'block-2', type: 'schedule', enabled: false, subBlocks: {} } as BlockState,
|
||||
'block-3': { id: 'block-3', type: 'schedule', subBlocks: {} } as BlockState, // enabled undefined = enabled
|
||||
}
|
||||
|
||||
const result = findScheduleBlocks(blocks)
|
||||
|
||||
expect(result).toHaveLength(2)
|
||||
expect(result.map((b) => b.id)).toEqual(['block-1', 'block-3'])
|
||||
})
|
||||
})
|
||||
|
||||
describe('validateScheduleBlock', () => {
|
||||
@@ -671,20 +708,24 @@ describe('Schedule Deploy Utilities', () => {
|
||||
})
|
||||
|
||||
describe('createSchedulesForDeploy', () => {
|
||||
const setupMockTransaction = (
|
||||
existingSchedules: Array<{ id: string; blockId: string }> = []
|
||||
) => {
|
||||
mockFrom.mockReturnValue({ where: vi.fn().mockResolvedValue(existingSchedules) })
|
||||
mockSelect.mockReturnValue({ from: mockFrom })
|
||||
}
|
||||
|
||||
it('should return success with no schedule blocks', async () => {
|
||||
const blocks: Record<string, BlockState> = {
|
||||
'block-1': { id: 'block-1', type: 'agent', subBlocks: {} } as BlockState,
|
||||
}
|
||||
|
||||
const mockTx = {
|
||||
insert: mockInsert,
|
||||
delete: mockDelete,
|
||||
}
|
||||
setupMockTransaction()
|
||||
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(mockInsert).not.toHaveBeenCalled()
|
||||
expect(mockTransaction).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should create schedule for valid schedule block', async () => {
|
||||
@@ -700,17 +741,15 @@ describe('Schedule Deploy Utilities', () => {
|
||||
} as BlockState,
|
||||
}
|
||||
|
||||
const mockTx = {
|
||||
insert: mockInsert,
|
||||
delete: mockDelete,
|
||||
}
|
||||
setupMockTransaction()
|
||||
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.scheduleId).toBe('test-uuid')
|
||||
expect(result.cronExpression).toBe('0 9 * * *')
|
||||
expect(result.nextRunAt).toEqual(new Date('2025-04-15T09:00:00Z'))
|
||||
expect(mockTransaction).toHaveBeenCalled()
|
||||
expect(mockInsert).toHaveBeenCalled()
|
||||
expect(mockOnConflictDoUpdate).toHaveBeenCalled()
|
||||
})
|
||||
@@ -727,16 +766,13 @@ describe('Schedule Deploy Utilities', () => {
|
||||
} as BlockState,
|
||||
}
|
||||
|
||||
const mockTx = {
|
||||
insert: mockInsert,
|
||||
delete: mockDelete,
|
||||
}
|
||||
setupMockTransaction()
|
||||
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
|
||||
|
||||
expect(result.success).toBe(false)
|
||||
expect(result.error).toBe('Time is required for daily schedules')
|
||||
expect(mockInsert).not.toHaveBeenCalled()
|
||||
expect(mockTransaction).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should use onConflictDoUpdate for existing schedules', async () => {
|
||||
@@ -752,12 +788,9 @@ describe('Schedule Deploy Utilities', () => {
|
||||
} as BlockState,
|
||||
}
|
||||
|
||||
const mockTx = {
|
||||
insert: mockInsert,
|
||||
delete: mockDelete,
|
||||
}
|
||||
setupMockTransaction()
|
||||
|
||||
await createSchedulesForDeploy('workflow-1', blocks, mockTx as any)
|
||||
await createSchedulesForDeploy('workflow-1', blocks, {} as any)
|
||||
|
||||
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith({
|
||||
target: expect.any(Array),
|
||||
@@ -769,6 +802,27 @@ describe('Schedule Deploy Utilities', () => {
|
||||
}),
|
||||
})
|
||||
})
|
||||
|
||||
it('should rollback on database error', async () => {
|
||||
const blocks: Record<string, BlockState> = {
|
||||
'block-1': {
|
||||
id: 'block-1',
|
||||
type: 'schedule',
|
||||
subBlocks: {
|
||||
scheduleType: { value: 'daily' },
|
||||
dailyTime: { value: '09:00' },
|
||||
timezone: { value: 'UTC' },
|
||||
},
|
||||
} as BlockState,
|
||||
}
|
||||
|
||||
mockTransaction.mockRejectedValueOnce(new Error('Database error'))
|
||||
|
||||
const result = await createSchedulesForDeploy('workflow-1', blocks, {} as any)
|
||||
|
||||
expect(result.success).toBe(false)
|
||||
expect(result.error).toBe('Database error')
|
||||
})
|
||||
})
|
||||
|
||||
describe('deleteSchedulesForWorkflow', () => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { db, workflowSchedule } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { and, eq, inArray } from 'drizzle-orm'
|
||||
import type { DbOrTx } from '@/lib/db/types'
|
||||
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
|
||||
import type { BlockState } from '@/lib/workflows/schedules/utils'
|
||||
@@ -21,13 +21,13 @@ export interface ScheduleDeployResult {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or update schedule records for a workflow during deployment
|
||||
* This should be called within a database transaction
|
||||
* Create or update schedule records for a workflow during deployment.
|
||||
* Uses a transaction to ensure atomicity - all schedules are created or none are.
|
||||
*/
|
||||
export async function createSchedulesForDeploy(
|
||||
workflowId: string,
|
||||
blocks: Record<string, BlockState>,
|
||||
tx: DbOrTx,
|
||||
_tx: DbOrTx,
|
||||
deploymentVersionId?: string
|
||||
): Promise<ScheduleDeployResult> {
|
||||
const scheduleBlocks = findScheduleBlocks(blocks)
|
||||
@@ -37,16 +37,16 @@ export async function createSchedulesForDeploy(
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
let lastScheduleInfo: {
|
||||
scheduleId: string
|
||||
cronExpression?: string
|
||||
nextRunAt?: Date
|
||||
timezone?: string
|
||||
} | null = null
|
||||
// Phase 1: Validate ALL blocks before making any DB changes
|
||||
const validatedBlocks: Array<{
|
||||
blockId: string
|
||||
cronExpression: string
|
||||
nextRunAt: Date
|
||||
timezone: string
|
||||
}> = []
|
||||
|
||||
for (const block of scheduleBlocks) {
|
||||
const blockId = block.id as string
|
||||
|
||||
const validation = validateScheduleBlock(block)
|
||||
if (!validation.isValid) {
|
||||
return {
|
||||
@@ -54,62 +54,112 @@ export async function createSchedulesForDeploy(
|
||||
error: validation.error,
|
||||
}
|
||||
}
|
||||
|
||||
const { cronExpression, nextRunAt, timezone } = validation
|
||||
|
||||
const scheduleId = crypto.randomUUID()
|
||||
const now = new Date()
|
||||
|
||||
const values = {
|
||||
id: scheduleId,
|
||||
workflowId,
|
||||
deploymentVersionId: deploymentVersionId || null,
|
||||
validatedBlocks.push({
|
||||
blockId,
|
||||
cronExpression: cronExpression!,
|
||||
triggerType: 'schedule',
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
nextRunAt: nextRunAt!,
|
||||
timezone: timezone!,
|
||||
status: 'active',
|
||||
failedCount: 0,
|
||||
}
|
||||
|
||||
const setValues = {
|
||||
blockId,
|
||||
cronExpression: cronExpression!,
|
||||
...(deploymentVersionId ? { deploymentVersionId } : {}),
|
||||
updatedAt: now,
|
||||
nextRunAt: nextRunAt!,
|
||||
timezone: timezone!,
|
||||
status: 'active',
|
||||
failedCount: 0,
|
||||
}
|
||||
|
||||
await tx
|
||||
.insert(workflowSchedule)
|
||||
.values(values)
|
||||
.onConflictDoUpdate({
|
||||
target: [
|
||||
workflowSchedule.workflowId,
|
||||
workflowSchedule.blockId,
|
||||
workflowSchedule.deploymentVersionId,
|
||||
],
|
||||
set: setValues,
|
||||
})
|
||||
|
||||
logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, {
|
||||
scheduleId: values.id,
|
||||
cronExpression,
|
||||
nextRunAt: nextRunAt?.toISOString(),
|
||||
cronExpression: validation.cronExpression!,
|
||||
nextRunAt: validation.nextRunAt!,
|
||||
timezone: validation.timezone!,
|
||||
})
|
||||
}
|
||||
|
||||
lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone }
|
||||
// Phase 2: All validations passed - now do DB operations in a transaction
|
||||
let lastScheduleInfo: {
|
||||
scheduleId: string
|
||||
cronExpression?: string
|
||||
nextRunAt?: Date
|
||||
timezone?: string
|
||||
} | null = null
|
||||
|
||||
try {
|
||||
await db.transaction(async (tx) => {
|
||||
const currentBlockIds = new Set(validatedBlocks.map((b) => b.blockId))
|
||||
|
||||
const existingSchedules = await tx
|
||||
.select({ id: workflowSchedule.id, blockId: workflowSchedule.blockId })
|
||||
.from(workflowSchedule)
|
||||
.where(
|
||||
deploymentVersionId
|
||||
? and(
|
||||
eq(workflowSchedule.workflowId, workflowId),
|
||||
eq(workflowSchedule.deploymentVersionId, deploymentVersionId)
|
||||
)
|
||||
: eq(workflowSchedule.workflowId, workflowId)
|
||||
)
|
||||
|
||||
const orphanedScheduleIds = existingSchedules
|
||||
.filter((s) => s.blockId && !currentBlockIds.has(s.blockId))
|
||||
.map((s) => s.id)
|
||||
|
||||
if (orphanedScheduleIds.length > 0) {
|
||||
logger.info(
|
||||
`Deleting ${orphanedScheduleIds.length} orphaned schedule(s) for workflow ${workflowId}`
|
||||
)
|
||||
await tx.delete(workflowSchedule).where(inArray(workflowSchedule.id, orphanedScheduleIds))
|
||||
}
|
||||
|
||||
for (const validated of validatedBlocks) {
|
||||
const { blockId, cronExpression, nextRunAt, timezone } = validated
|
||||
const scheduleId = crypto.randomUUID()
|
||||
const now = new Date()
|
||||
|
||||
const values = {
|
||||
id: scheduleId,
|
||||
workflowId,
|
||||
deploymentVersionId: deploymentVersionId || null,
|
||||
blockId,
|
||||
cronExpression,
|
||||
triggerType: 'schedule',
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
nextRunAt,
|
||||
timezone,
|
||||
status: 'active',
|
||||
failedCount: 0,
|
||||
}
|
||||
|
||||
const setValues = {
|
||||
blockId,
|
||||
cronExpression,
|
||||
...(deploymentVersionId ? { deploymentVersionId } : {}),
|
||||
updatedAt: now,
|
||||
nextRunAt,
|
||||
timezone,
|
||||
status: 'active',
|
||||
failedCount: 0,
|
||||
}
|
||||
|
||||
await tx
|
||||
.insert(workflowSchedule)
|
||||
.values(values)
|
||||
.onConflictDoUpdate({
|
||||
target: [
|
||||
workflowSchedule.workflowId,
|
||||
workflowSchedule.blockId,
|
||||
workflowSchedule.deploymentVersionId,
|
||||
],
|
||||
set: setValues,
|
||||
})
|
||||
|
||||
logger.info(`Schedule created/updated for workflow ${workflowId}, block ${blockId}`, {
|
||||
scheduleId: values.id,
|
||||
cronExpression,
|
||||
nextRunAt: nextRunAt?.toISOString(),
|
||||
})
|
||||
|
||||
lastScheduleInfo = { scheduleId: values.id, cronExpression, nextRunAt, timezone }
|
||||
}
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`Failed to create schedules for workflow ${workflowId}`, error)
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to create schedules',
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
...lastScheduleInfo,
|
||||
...(lastScheduleInfo ?? {}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,8 +195,25 @@ export async function cleanupDeploymentVersion(params: {
|
||||
workflow: Record<string, unknown>
|
||||
requestId: string
|
||||
deploymentVersionId: string
|
||||
/**
|
||||
* If true, skip external subscription cleanup (already done by saveTriggerWebhooksForDeploy).
|
||||
* Only deletes DB records.
|
||||
*/
|
||||
skipExternalCleanup?: boolean
|
||||
}): Promise<void> {
|
||||
const { workflowId, workflow, requestId, deploymentVersionId } = params
|
||||
await cleanupWebhooksForWorkflow(workflowId, workflow, requestId, deploymentVersionId)
|
||||
const {
|
||||
workflowId,
|
||||
workflow,
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
skipExternalCleanup = false,
|
||||
} = params
|
||||
await cleanupWebhooksForWorkflow(
|
||||
workflowId,
|
||||
workflow,
|
||||
requestId,
|
||||
deploymentVersionId,
|
||||
skipExternalCleanup
|
||||
)
|
||||
await deleteSchedulesForWorkflow(workflowId, db, deploymentVersionId)
|
||||
}
|
||||
|
||||
@@ -98,9 +98,12 @@ function getMissingConfigError(scheduleType: string): string {
|
||||
|
||||
/**
|
||||
* Find schedule blocks in a workflow's blocks
|
||||
* Only returns enabled schedule blocks (disabled blocks are skipped)
|
||||
*/
|
||||
export function findScheduleBlocks(blocks: Record<string, BlockState>): BlockState[] {
|
||||
return Object.values(blocks).filter((block) => block.type === 'schedule')
|
||||
return Object.values(blocks).filter(
|
||||
(block) => block.type === 'schedule' && block.enabled !== false
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -4,7 +4,6 @@ import { create } from 'zustand'
|
||||
import { devtools } from 'zustand/middleware'
|
||||
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
|
||||
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
|
||||
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SubBlockConfig } from '@/blocks/types'
|
||||
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
|
||||
@@ -1171,93 +1170,6 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
// Note: Socket.IO handles real-time sync automatically
|
||||
},
|
||||
|
||||
toggleBlockTriggerMode: (id: string) => {
|
||||
const block = get().blocks[id]
|
||||
if (!block) return
|
||||
|
||||
const newTriggerMode = !block.triggerMode
|
||||
|
||||
// When switching TO trigger mode, check if block is inside a subflow
|
||||
if (newTriggerMode && TriggerUtils.isBlockInSubflow(id, get().blocks)) {
|
||||
logger.warn('Cannot enable trigger mode for block inside loop or parallel subflow', {
|
||||
blockId: id,
|
||||
blockType: block.type,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// When switching TO trigger mode, remove all incoming connections
|
||||
let filteredEdges = [...get().edges]
|
||||
if (newTriggerMode) {
|
||||
// Remove edges where this block is the target
|
||||
filteredEdges = filteredEdges.filter((edge) => edge.target !== id)
|
||||
logger.info(
|
||||
`Removed ${get().edges.length - filteredEdges.length} incoming connections for trigger mode`,
|
||||
{
|
||||
blockId: id,
|
||||
blockType: block.type,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const newState = {
|
||||
blocks: {
|
||||
...get().blocks,
|
||||
[id]: {
|
||||
...block,
|
||||
triggerMode: newTriggerMode,
|
||||
},
|
||||
},
|
||||
edges: filteredEdges,
|
||||
loops: { ...get().loops },
|
||||
parallels: { ...get().parallels },
|
||||
}
|
||||
|
||||
set(newState)
|
||||
get().updateLastSaved()
|
||||
|
||||
// Handle webhook enable/disable when toggling trigger mode
|
||||
const handleWebhookToggle = async () => {
|
||||
try {
|
||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||
if (!activeWorkflowId) return
|
||||
|
||||
// Check if there's a webhook for this block
|
||||
const response = await fetch(
|
||||
`/api/webhooks?workflowId=${activeWorkflowId}&blockId=${id}`
|
||||
)
|
||||
if (response.ok) {
|
||||
const data = await response.json()
|
||||
if (data.webhooks && data.webhooks.length > 0) {
|
||||
const webhook = data.webhooks[0].webhook
|
||||
|
||||
// Update webhook's isActive status based on trigger mode
|
||||
const updateResponse = await fetch(`/api/webhooks/${webhook.id}`, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
isActive: newTriggerMode,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!updateResponse.ok) {
|
||||
logger.error('Failed to update webhook status')
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error toggling webhook status:', error)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle webhook toggle asynchronously
|
||||
handleWebhookToggle()
|
||||
|
||||
// Note: Socket.IO handles real-time sync automatically
|
||||
},
|
||||
|
||||
// Parallel block methods implementation
|
||||
updateParallelCount: (parallelId: string, count: number) => {
|
||||
const block = get().blocks[parallelId]
|
||||
|
||||
@@ -241,7 +241,6 @@ export interface WorkflowActions {
|
||||
setNeedsRedeploymentFlag: (needsRedeployment: boolean) => void
|
||||
revertToDeployedState: (deployedState: WorkflowState) => void
|
||||
toggleBlockAdvancedMode: (id: string) => void
|
||||
toggleBlockTriggerMode: (id: string) => void
|
||||
setDragStartPosition: (position: DragStartPosition | null) => void
|
||||
getDragStartPosition: () => DragStartPosition | null
|
||||
getWorkflowState: () => WorkflowState
|
||||
|
||||
Reference in New Issue
Block a user