fix(webhooks/schedules): deployment version friendly

This commit is contained in:
Vikhyath Mondreti
2026-01-16 14:12:00 -08:00
parent d11317597a
commit 8e6ea1145c
20 changed files with 10971 additions and 92 deletions

View File

@@ -1,7 +1,7 @@
import { db, workflowSchedule } from '@sim/db'
import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, isNull, lt, lte, not, or } from 'drizzle-orm'
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
@@ -37,7 +37,8 @@ export async function GET(request: NextRequest) {
or(
isNull(workflowSchedule.lastQueuedAt),
lt(workflowSchedule.lastQueuedAt, workflowSchedule.nextRunAt)
)
),
sql`${workflowSchedule.deploymentVersionId} = (select ${workflowDeploymentVersion.id} from ${workflowDeploymentVersion} where ${workflowDeploymentVersion.workflowId} = ${workflowSchedule.workflowId} and ${workflowDeploymentVersion.isActive} = true)`
)
)
.returning({

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { workflow, workflowSchedule } from '@sim/db/schema'
import { workflow, workflowDeploymentVersion, workflowSchedule } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { generateRequestId } from '@/lib/core/utils/request'
@@ -62,9 +62,24 @@ export async function GET(req: NextRequest) {
}
const schedule = await db
.select()
.select({ schedule: workflowSchedule })
.from(workflowSchedule)
.where(conditions.length > 1 ? and(...conditions) : conditions[0])
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflowSchedule.workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
...conditions,
or(
eq(workflowSchedule.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(workflowSchedule.deploymentVersionId))
)
)
)
.limit(1)
const headers = new Headers()
@@ -74,7 +89,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ schedule: null }, { headers })
}
const scheduleData = schedule[0]
const scheduleData = schedule[0].schedule
const isDisabled = scheduleData.status === 'disabled'
const hasFailures = scheduleData.failedCount > 0

View File

@@ -60,7 +60,17 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return internalErrorResponse(deployResult.error || 'Failed to deploy workflow')
}
const scheduleResult = await createSchedulesForDeploy(workflowId, normalizedData.blocks, db)
if (!deployResult.deploymentVersionId) {
await undeployWorkflow({ workflowId })
return internalErrorResponse('Failed to resolve deployment version')
}
const scheduleResult = await createSchedulesForDeploy(
workflowId,
normalizedData.blocks,
db,
deployResult.deploymentVersionId
)
if (!scheduleResult.success) {
logger.warn(`Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`)
}

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { webhook, workflow } from '@sim/db/schema'
import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { and, desc, eq, isNull, or } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
@@ -71,7 +71,23 @@ export async function GET(request: NextRequest) {
})
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.where(and(eq(webhook.workflowId, workflowId), eq(webhook.blockId, blockId)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.workflowId, workflowId),
eq(webhook.blockId, blockId),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
.orderBy(desc(webhook.updatedAt))
logger.info(
@@ -149,7 +165,23 @@ export async function POST(request: NextRequest) {
const existingForBlock = await db
.select({ id: webhook.id, path: webhook.path })
.from(webhook)
.where(and(eq(webhook.workflowId, workflowId), eq(webhook.blockId, blockId)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.workflowId, workflowId),
eq(webhook.blockId, blockId),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
.limit(1)
if (existingForBlock.length > 0) {
@@ -225,7 +257,23 @@ export async function POST(request: NextRequest) {
const existingForBlock = await db
.select({ id: webhook.id })
.from(webhook)
.where(and(eq(webhook.workflowId, workflowId), eq(webhook.blockId, blockId)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.workflowId, workflowId),
eq(webhook.blockId, blockId),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
.limit(1)
if (existingForBlock.length > 0) {
targetWebhookId = existingForBlock[0].id

View File

@@ -10,7 +10,11 @@ import {
loadWorkflowFromNormalizedTables,
undeployWorkflow,
} from '@/lib/workflows/persistence/utils'
import { createSchedulesForDeploy, validateWorkflowSchedules } from '@/lib/workflows/schedules'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
@@ -131,22 +135,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
workflow: workflowData,
userId: actorUserId,
blocks: normalizedData.blocks,
requestId,
})
if (!triggerSaveResult.success) {
return createErrorResponse(
triggerSaveResult.error?.message || 'Failed to save trigger configuration',
triggerSaveResult.error?.status || 500
)
}
const deployResult = await deployWorkflow({
workflowId: id,
deployedBy: actorUserId,
@@ -158,14 +146,58 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
}
const deployedAt = deployResult.deployedAt!
const deploymentVersionId = deployResult.deploymentVersionId
if (!deploymentVersionId) {
await undeployWorkflow({ workflowId: id })
return createErrorResponse('Failed to resolve deployment version', 500)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
workflow: workflowData,
userId: actorUserId,
blocks: normalizedData.blocks,
requestId,
deploymentVersionId,
})
if (!triggerSaveResult.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId,
})
await undeployWorkflow({ workflowId: id })
return createErrorResponse(
triggerSaveResult.error?.message || 'Failed to save trigger configuration',
triggerSaveResult.error?.status || 500
)
}
let scheduleInfo: { scheduleId?: string; cronExpression?: string; nextRunAt?: Date } = {}
const scheduleResult = await createSchedulesForDeploy(id, normalizedData.blocks, db)
const scheduleResult = await createSchedulesForDeploy(
id,
normalizedData.blocks,
db,
deploymentVersionId
)
if (!scheduleResult.success) {
logger.error(
`[${requestId}] Failed to create schedule for workflow ${id}: ${scheduleResult.error}`
)
} else if (scheduleResult.scheduleId) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId,
})
await undeployWorkflow({ workflowId: id })
return createErrorResponse(scheduleResult.error || 'Failed to create schedule', 500)
}
if (scheduleResult.scheduleId) {
scheduleInfo = {
scheduleId: scheduleResult.scheduleId,
cronExpression: scheduleResult.cronExpression,

View File

@@ -1,10 +1,19 @@
import { db, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowActivateDeploymentAPI')
@@ -19,30 +28,135 @@ export async function POST(
const { id, version } = await params
try {
const { error } = await validateWorkflowPermissions(id, requestId, 'admin')
const {
error,
session,
workflow: workflowData,
} = await validateWorkflowPermissions(id, requestId, 'admin')
if (error) {
return createErrorResponse(error.message, error.status)
}
const actorUserId = session?.user?.id
if (!actorUserId) {
logger.warn(`[${requestId}] Unable to resolve actor user for deployment activation: ${id}`)
return createErrorResponse('Unable to determine activating user', 400)
}
const versionNum = Number(version)
if (!Number.isFinite(versionNum)) {
return createErrorResponse('Invalid version number', 400)
}
const [versionRow] = await db
.select({
id: workflowDeploymentVersion.id,
state: workflowDeploymentVersion.state,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.limit(1)
if (!versionRow?.state) {
return createErrorResponse('Deployment version not found', 404)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
const blocks = deployedState.blocks
if (!blocks || typeof blocks !== 'object') {
return createErrorResponse('Invalid deployed state structure', 500)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
blocks,
requestId,
deploymentVersionId: versionRow.id,
})
if (!triggerSaveResult.success) {
return createErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration',
triggerSaveResult.error?.status || 500
)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
if (!scheduleResult.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: versionRow.id,
})
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
}
const result = await activateWorkflowVersion({ workflowId: id, version: versionNum })
if (!result.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: versionRow.id,
})
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
}
if (result.state) {
await syncMcpToolsForWorkflow({
workflowId: id,
requestId,
state: result.state,
context: 'activate',
})
if (previousVersionId && previousVersionId !== versionRow.id) {
try {
logger.info(
`[${requestId}] Cleaning up previous version ${previousVersionId} webhooks/schedules`
)
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
})
logger.info(`[${requestId}] Previous version cleanup completed`)
} catch (cleanupError) {
logger.error(
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
await syncMcpToolsForWorkflow({
workflowId: id,
requestId,
state: versionRow.state,
context: 'activate',
})
return createSuccessResponse({ success: true, deployedAt: result.deployedAt })
} catch (error: any) {
logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error)

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq, inArray } from 'drizzle-orm'
import { and, eq, inArray } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import type { NextRequest } from 'next/server'
import { getProviderIdFromServiceId } from '@/lib/oauth'
@@ -41,6 +41,7 @@ interface SaveTriggerWebhooksInput {
userId: string
blocks: Record<string, BlockState>
requestId: string
deploymentVersionId?: string
}
function getSubBlockValue(block: BlockState, subBlockId: string): unknown {
@@ -246,8 +247,17 @@ async function syncCredentialSetWebhooks(params: {
triggerPath: string
providerConfig: Record<string, unknown>
requestId: string
deploymentVersionId?: string
}): Promise<TriggerSaveError | null> {
const { workflowId, blockId, provider, triggerPath, providerConfig, requestId } = params
const {
workflowId,
blockId,
provider,
triggerPath,
providerConfig,
requestId,
deploymentVersionId,
} = params
const credentialSetId = providerConfig.credentialSetId as string | undefined
if (!credentialSetId) {
@@ -267,6 +277,7 @@ async function syncCredentialSetWebhooks(params: {
oauthProviderId,
providerConfig: baseConfig as Record<string, any>,
requestId,
deploymentVersionId,
})
if (syncResult.webhooks.length === 0) {
@@ -308,6 +319,7 @@ async function createWebhookForBlock(params: {
providerConfig: Record<string, unknown>
triggerPath: string
requestId: string
deploymentVersionId?: string
}): Promise<TriggerSaveError | null> {
const {
request,
@@ -319,6 +331,7 @@ async function createWebhookForBlock(params: {
providerConfig,
triggerPath,
requestId,
deploymentVersionId,
} = params
const webhookId = nanoid()
@@ -346,6 +359,7 @@ async function createWebhookForBlock(params: {
.values({
id: webhookId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId: block.id,
path: triggerPath,
provider,
@@ -383,6 +397,7 @@ export async function saveTriggerWebhooksForDeploy({
userId,
blocks,
requestId,
deploymentVersionId,
}: SaveTriggerWebhooksInput): Promise<TriggerSaveResult> {
const triggerBlocks = Object.values(blocks || {}).filter(Boolean)
const currentBlockIds = new Set(triggerBlocks.map((b) => b.id))
@@ -495,6 +510,7 @@ export async function saveTriggerWebhooksForDeploy({
triggerPath,
providerConfig,
requestId,
deploymentVersionId,
})
if (credentialSetError) {
@@ -515,6 +531,7 @@ export async function saveTriggerWebhooksForDeploy({
providerConfig,
triggerPath,
requestId,
deploymentVersionId,
})
if (createError) {
@@ -537,6 +554,13 @@ export async function saveTriggerWebhooksForDeploy({
;(block as any)._webhookConfig = undefined
}
if (deploymentVersionId) {
await db
.update(webhook)
.set({ deploymentVersionId, updatedAt: new Date() })
.where(eq(webhook.workflowId, workflowId))
}
return { success: true }
}
@@ -547,9 +571,20 @@ export async function saveTriggerWebhooksForDeploy({
export async function cleanupWebhooksForWorkflow(
workflowId: string,
workflow: Record<string, unknown>,
requestId: string
requestId: string,
deploymentVersionId?: string
): Promise<void> {
const existingWebhooks = await db.select().from(webhook).where(eq(webhook.workflowId, workflowId))
const existingWebhooks = await db
.select()
.from(webhook)
.where(
deploymentVersionId
? and(
eq(webhook.workflowId, workflowId),
eq(webhook.deploymentVersionId, deploymentVersionId)
)
: eq(webhook.workflowId, workflowId)
)
if (existingWebhooks.length === 0) {
logger.debug(`[${requestId}] No webhooks to clean up for workflow ${workflowId}`)
@@ -558,6 +593,7 @@ export async function cleanupWebhooksForWorkflow(
logger.info(`[${requestId}] Cleaning up ${existingWebhooks.length} webhook(s) for undeploy`, {
workflowId,
deploymentVersionId,
webhookIds: existingWebhooks.map((wh) => wh.id),
})
@@ -572,7 +608,20 @@ export async function cleanupWebhooksForWorkflow(
}
// Delete all webhook records
await db.delete(webhook).where(eq(webhook.workflowId, workflowId))
await db
.delete(webhook)
.where(
deploymentVersionId
? and(
eq(webhook.workflowId, workflowId),
eq(webhook.deploymentVersionId, deploymentVersionId)
)
: eq(webhook.workflowId, workflowId)
)
logger.info(`[${requestId}] Cleaned up all webhooks for workflow ${workflowId}`)
logger.info(
deploymentVersionId
? `[${requestId}] Cleaned up webhooks for workflow ${workflowId} deployment ${deploymentVersionId}`
: `[${requestId}] Cleaned up all webhooks for workflow ${workflowId}`
)
}

View File

@@ -1,7 +1,13 @@
import { db } from '@sim/db'
import { account, credentialSet, webhook, workflow } from '@sim/db/schema'
import {
account,
credentialSet,
webhook,
workflow,
workflowDeploymentVersion,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, sql } from 'drizzle-orm'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
@@ -111,11 +117,22 @@ export async function pollGmailWebhooks() {
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'gmail'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true)
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)

View File

@@ -1,8 +1,8 @@
import { db } from '@sim/db'
import { webhook, workflow } from '@sim/db/schema'
import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import type { InferSelectModel } from 'drizzle-orm'
import { and, eq, sql } from 'drizzle-orm'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import type { FetchMessageObject, MailboxLockObject } from 'imapflow'
import { ImapFlow } from 'imapflow'
import { nanoid } from 'nanoid'
@@ -113,8 +113,23 @@ export async function pollImapWebhooks() {
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(eq(webhook.provider, 'imap'), eq(webhook.isActive, true), eq(workflow.isDeployed, true))
and(
eq(webhook.provider, 'imap'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)

View File

@@ -1,7 +1,13 @@
import { db } from '@sim/db'
import { account, credentialSet, webhook, workflow } from '@sim/db/schema'
import {
account,
credentialSet,
webhook,
workflow,
workflowDeploymentVersion,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, sql } from 'drizzle-orm'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import { htmlToText } from 'html-to-text'
import { nanoid } from 'nanoid'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing'
@@ -161,11 +167,22 @@ export async function pollOutlookWebhooks() {
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'outlook'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true)
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)

View File

@@ -1,8 +1,8 @@
import { db, webhook, workflow } from '@sim/db'
import { db, webhook, workflow, workflowDeploymentVersion } from '@sim/db'
import { credentialSet, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { tasks } from '@trigger.dev/sdk'
import { and, eq } from 'drizzle-orm'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
@@ -280,7 +280,23 @@ export async function findWebhookAndWorkflow(
})
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.where(and(eq(webhook.id, options.webhookId), eq(webhook.isActive, true)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.id, options.webhookId),
eq(webhook.isActive, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
.limit(1)
if (results.length === 0) {
@@ -299,7 +315,23 @@ export async function findWebhookAndWorkflow(
})
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.where(and(eq(webhook.path, options.path), eq(webhook.isActive, true)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.path, options.path),
eq(webhook.isActive, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
.limit(1)
if (results.length === 0) {
@@ -331,7 +363,23 @@ export async function findAllWebhooksForPath(
})
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.where(and(eq(webhook.path, options.path), eq(webhook.isActive, true)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.path, options.path),
eq(webhook.isActive, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
if (results.length === 0) {
logger.warn(`[${options.requestId}] No active webhooks found for path: ${options.path}`)

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { webhook, workflow } from '@sim/db/schema'
import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, sql } from 'drizzle-orm'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import Parser from 'rss-parser'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
@@ -119,8 +119,23 @@ export async function pollRssWebhooks() {
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(eq(webhook.provider, 'rss'), eq(webhook.isActive, true), eq(workflow.isDeployed, true))
and(
eq(webhook.provider, 'rss'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { db, workflowDeploymentVersion } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
import type { DbOrTx } from '@/lib/db/types'
@@ -28,11 +28,28 @@ export async function handleWhatsAppVerification(
}
const webhooks = await db
.select()
.select({ webhook })
.from(webhook)
.where(and(eq(webhook.provider, 'whatsapp'), eq(webhook.isActive, true)))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, webhook.workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'whatsapp'),
eq(webhook.isActive, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
for (const wh of webhooks) {
for (const row of webhooks) {
const wh = row.webhook
const providerConfig = (wh.providerConfig as Record<string, any>) || {}
const verificationToken = providerConfig.verificationToken
@@ -1921,6 +1938,7 @@ export async function syncWebhooksForCredentialSet(params: {
providerConfig: Record<string, any>
requestId: string
tx?: DbOrTx
deploymentVersionId?: string
}): Promise<CredentialSetWebhookSyncResult> {
const {
workflowId,
@@ -1932,6 +1950,7 @@ export async function syncWebhooksForCredentialSet(params: {
providerConfig,
requestId,
tx,
deploymentVersionId,
} = params
const dbCtx = tx ?? db
@@ -1966,7 +1985,15 @@ export async function syncWebhooksForCredentialSet(params: {
const existingWebhooks = await dbCtx
.select()
.from(webhook)
.where(and(eq(webhook.workflowId, workflowId), eq(webhook.blockId, blockId)))
.where(
deploymentVersionId
? and(
eq(webhook.workflowId, workflowId),
eq(webhook.blockId, blockId),
eq(webhook.deploymentVersionId, deploymentVersionId)
)
: and(eq(webhook.workflowId, workflowId), eq(webhook.blockId, blockId))
)
// Filter to only webhooks belonging to this credential set
const credentialSetWebhooks = existingWebhooks.filter((wh) => {
@@ -2020,6 +2047,7 @@ export async function syncWebhooksForCredentialSet(params: {
await dbCtx
.update(webhook)
.set({
...(deploymentVersionId ? { deploymentVersionId } : {}),
providerConfig: updatedConfig,
isActive: true,
updatedAt: new Date(),
@@ -2058,6 +2086,7 @@ export async function syncWebhooksForCredentialSet(params: {
providerConfig: newConfig,
credentialSetId, // Indexed column for efficient credential set queries
isActive: true,
...(deploymentVersionId ? { deploymentVersionId } : {}),
createdAt: new Date(),
updatedAt: new Date(),
})
@@ -2113,9 +2142,24 @@ export async function syncAllWebhooksForCredentialSet(
// Find all webhooks that use this credential set using the indexed column
const webhooksForSet = await dbCtx
.select()
.select({ webhook })
.from(webhook)
.where(eq(webhook.credentialSetId, credentialSetId))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, webhook.workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.credentialSetId, credentialSetId),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
if (webhooksForSet.length === 0) {
syncLogger.info(`[${requestId}] No webhooks found using credential set ${credentialSetId}`)
@@ -2123,8 +2167,9 @@ export async function syncAllWebhooksForCredentialSet(
}
// Group webhooks by workflow+block to find unique triggers
const triggerGroups = new Map<string, (typeof webhooksForSet)[number]>()
for (const wh of webhooksForSet) {
const triggerGroups = new Map<string, (typeof webhooksForSet)[number]['webhook']>()
for (const row of webhooksForSet) {
const wh = row.webhook
const key = `${wh.workflowId}:${wh.blockId}`
// Keep the first webhook as representative (they all have same config)
if (!triggerGroups.has(key)) {

View File

@@ -495,6 +495,7 @@ export async function deployWorkflow(params: {
}): Promise<{
success: boolean
version?: number
deploymentVersionId?: string
deployedAt?: Date
currentState?: any
error?: string
@@ -533,6 +534,7 @@ export async function deployWorkflow(params: {
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
const nextVersion = Number(maxVersion) + 1
const deploymentVersionId = uuidv4()
// Deactivate all existing versions
await tx
@@ -542,7 +544,7 @@ export async function deployWorkflow(params: {
// Create new deployment version
await tx.insert(workflowDeploymentVersion).values({
id: uuidv4(),
id: deploymentVersionId,
workflowId,
version: nextVersion,
state: currentState,
@@ -562,10 +564,10 @@ export async function deployWorkflow(params: {
// Note: Templates are NOT automatically updated on deployment
// Template updates must be done explicitly through the "Update Template" button
return nextVersion
return { version: nextVersion, deploymentVersionId }
})
logger.info(`Deployed workflow ${workflowId} as v${deployedVersion}`)
logger.info(`Deployed workflow ${workflowId} as v${deployedVersion.version}`)
if (workflowName) {
try {
@@ -582,7 +584,7 @@ export async function deployWorkflow(params: {
workflowName,
blocksCount: Object.keys(currentState.blocks).length,
edgesCount: currentState.edges.length,
version: deployedVersion,
version: deployedVersion.version,
loopsCount: Object.keys(currentState.loops).length,
parallelsCount: Object.keys(currentState.parallels).length,
blockTypes: JSON.stringify(blockTypeCounts),
@@ -594,7 +596,8 @@ export async function deployWorkflow(params: {
return {
success: true,
version: deployedVersion,
version: deployedVersion.version,
deploymentVersionId: deployedVersion.deploymentVersionId,
deployedAt: now,
currentState,
}

View File

@@ -1,7 +1,8 @@
import { workflowSchedule } from '@sim/db'
import { db, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import type { DbOrTx } from '@/lib/db/types'
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
import type { BlockState } from '@/lib/workflows/schedules/utils'
import { findScheduleBlocks, validateScheduleBlock } from '@/lib/workflows/schedules/validation'
@@ -26,7 +27,8 @@ export interface ScheduleDeployResult {
export async function createSchedulesForDeploy(
workflowId: string,
blocks: Record<string, BlockState>,
tx: DbOrTx
tx: DbOrTx,
deploymentVersionId?: string
): Promise<ScheduleDeployResult> {
const scheduleBlocks = findScheduleBlocks(blocks)
@@ -61,6 +63,7 @@ export async function createSchedulesForDeploy(
const values = {
id: scheduleId,
workflowId,
deploymentVersionId: deploymentVersionId || null,
blockId,
cronExpression: cronExpression!,
triggerType: 'schedule',
@@ -75,6 +78,7 @@ export async function createSchedulesForDeploy(
const setValues = {
blockId,
cronExpression: cronExpression!,
...(deploymentVersionId ? { deploymentVersionId } : {}),
updatedAt: now,
nextRunAt: nextRunAt!,
timezone: timezone!,
@@ -86,7 +90,11 @@ export async function createSchedulesForDeploy(
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [workflowSchedule.workflowId, workflowSchedule.blockId],
target: [
workflowSchedule.workflowId,
workflowSchedule.blockId,
workflowSchedule.deploymentVersionId,
],
set: setValues,
})
@@ -109,8 +117,36 @@ export async function createSchedulesForDeploy(
* Delete all schedules for a workflow
* This should be called within a database transaction during undeploy
*/
export async function deleteSchedulesForWorkflow(workflowId: string, tx: DbOrTx): Promise<void> {
await tx.delete(workflowSchedule).where(eq(workflowSchedule.workflowId, workflowId))
export async function deleteSchedulesForWorkflow(
workflowId: string,
tx: DbOrTx,
deploymentVersionId?: string
): Promise<void> {
await tx
.delete(workflowSchedule)
.where(
deploymentVersionId
? and(
eq(workflowSchedule.workflowId, workflowId),
eq(workflowSchedule.deploymentVersionId, deploymentVersionId)
)
: eq(workflowSchedule.workflowId, workflowId)
)
logger.info(`Deleted all schedules for workflow ${workflowId}`)
logger.info(
deploymentVersionId
? `Deleted schedules for workflow ${workflowId} deployment ${deploymentVersionId}`
: `Deleted all schedules for workflow ${workflowId}`
)
}
export async function cleanupDeploymentVersion(params: {
workflowId: string
workflow: Record<string, unknown>
requestId: string
deploymentVersionId: string
}): Promise<void> {
const { workflowId, workflow, requestId, deploymentVersionId } = params
await cleanupWebhooksForWorkflow(workflowId, workflow, requestId, deploymentVersionId)
await deleteSchedulesForWorkflow(workflowId, db, deploymentVersionId)
}

View File

@@ -1,4 +1,5 @@
export {
cleanupDeploymentVersion,
createSchedulesForDeploy,
deleteSchedulesForWorkflow,
type ScheduleDeployResult,

View File

@@ -0,0 +1,26 @@
ALTER TABLE "webhook" DROP CONSTRAINT "webhook_block_id_workflow_blocks_id_fk";
--> statement-breakpoint
ALTER TABLE "workflow_schedule" DROP CONSTRAINT "workflow_schedule_block_id_workflow_blocks_id_fk";
--> statement-breakpoint
DROP INDEX "path_idx";--> statement-breakpoint
DROP INDEX "workflow_schedule_workflow_block_unique";--> statement-breakpoint
ALTER TABLE "webhook" ADD COLUMN "deployment_version_id" text;--> statement-breakpoint
ALTER TABLE "workflow_schedule" ADD COLUMN "deployment_version_id" text;--> statement-breakpoint
ALTER TABLE "webhook" ADD CONSTRAINT "webhook_deployment_version_id_workflow_deployment_version_id_fk" FOREIGN KEY ("deployment_version_id") REFERENCES "public"."workflow_deployment_version"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_schedule" ADD CONSTRAINT "workflow_schedule_deployment_version_id_workflow_deployment_version_id_fk" FOREIGN KEY ("deployment_version_id") REFERENCES "public"."workflow_deployment_version"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE UNIQUE INDEX "path_deployment_unique" ON "webhook" USING btree ("path","deployment_version_id");--> statement-breakpoint
CREATE INDEX "webhook_workflow_deployment_idx" ON "webhook" USING btree ("workflow_id","deployment_version_id");--> statement-breakpoint
CREATE UNIQUE INDEX "workflow_schedule_workflow_block_deployment_unique" ON "workflow_schedule" USING btree ("workflow_id","block_id","deployment_version_id");--> statement-breakpoint
CREATE INDEX "workflow_schedule_workflow_deployment_idx" ON "workflow_schedule" USING btree ("workflow_id","deployment_version_id");--> statement-breakpoint
UPDATE "webhook" AS w
SET "deployment_version_id" = dv."id"
FROM "workflow_deployment_version" AS dv
WHERE dv."workflow_id" = w."workflow_id"
AND dv."is_active" = true
AND w."deployment_version_id" IS NULL;--> statement-breakpoint
UPDATE "workflow_schedule" AS ws
SET "deployment_version_id" = dv."id"
FROM "workflow_deployment_version" AS dv
WHERE dv."workflow_id" = ws."workflow_id"
AND dv."is_active" = true
AND ws."deployment_version_id" IS NULL;

File diff suppressed because it is too large Load Diff

View File

@@ -1002,6 +1002,13 @@
"when": 1768518143986,
"tag": "0143_puzzling_xorn",
"breakpoints": true
},
{
"idx": 144,
"version": "7",
"when": 1768601382697,
"tag": "0144_mysterious_wendell_vaughn",
"breakpoints": true
}
]
}

View File

@@ -492,7 +492,11 @@ export const workflowSchedule = pgTable(
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
blockId: text('block_id').references(() => workflowBlocks.id, { onDelete: 'cascade' }),
deploymentVersionId: text('deployment_version_id').references(
() => workflowDeploymentVersion.id,
{ onDelete: 'cascade' }
),
blockId: text('block_id'),
cronExpression: text('cron_expression'),
nextRunAt: timestamp('next_run_at'),
lastRanAt: timestamp('last_ran_at'),
@@ -507,9 +511,14 @@ export const workflowSchedule = pgTable(
},
(table) => {
return {
workflowBlockUnique: uniqueIndex('workflow_schedule_workflow_block_unique').on(
workflowBlockUnique: uniqueIndex('workflow_schedule_workflow_block_deployment_unique').on(
table.workflowId,
table.blockId
table.blockId,
table.deploymentVersionId
),
workflowDeploymentIdx: index('workflow_schedule_workflow_deployment_idx').on(
table.workflowId,
table.deploymentVersionId
),
}
}
@@ -522,7 +531,11 @@ export const webhook = pgTable(
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
blockId: text('block_id').references(() => workflowBlocks.id, { onDelete: 'cascade' }), // ID of the webhook trigger block (nullable for legacy starter block webhooks)
deploymentVersionId: text('deployment_version_id').references(
() => workflowDeploymentVersion.id,
{ onDelete: 'cascade' }
),
blockId: text('block_id'),
path: text('path').notNull(),
provider: text('provider'), // e.g., "whatsapp", "github", etc.
providerConfig: json('provider_config'), // Store provider-specific configuration
@@ -537,13 +550,17 @@ export const webhook = pgTable(
},
(table) => {
return {
// Ensure webhook paths are unique
pathIdx: uniqueIndex('path_idx').on(table.path),
// Ensure webhook paths are unique per deployment version
pathIdx: uniqueIndex('path_deployment_unique').on(table.path, table.deploymentVersionId),
// Optimize queries for webhooks by workflow and block
workflowBlockIdx: index('idx_webhook_on_workflow_id_block_id').on(
table.workflowId,
table.blockId
),
workflowDeploymentIdx: index('webhook_workflow_deployment_idx').on(
table.workflowId,
table.deploymentVersionId
),
// Optimize queries for credential set webhooks
credentialSetIdIdx: index('webhook_credential_set_id_idx').on(table.credentialSetId),
}