mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
* improvement(processing): reduce redundant DB queries in execution preprocessing * improvement(processing): add defensive ID check for prefetched workflow record * improvement(processing): fix type safety in execution error logging Replace `as any` cast in non-SSE error path with proper `buildTraceSpans()` transformation, matching the SSE error path. Remove redundant `as any` cast in preprocessing.ts where the types already align. * improvement(processing): replace `as any` casts with proper types in logging - logger.ts: cast JSONB cost column to `WorkflowExecutionLog['cost']` instead of `any` in both `completeWorkflowExecution` and `getWorkflowExecution` - logger.ts: replace `(orgUsageBefore as any)?.toString?.()` with `String()` since COALESCE guarantees a non-null SQL aggregate value - logging-session.ts: cast JSONB cost to `AccumulatedCost` (the local interface) instead of `any` in `loadExistingCost` * improvement(processing): use exported HighestPrioritySubscription type in usage.ts Replace inline `Awaited<ReturnType<typeof getHighestPrioritySubscription>>` with the already-exported `HighestPrioritySubscription` type alias. * improvement(processing): replace remaining `as any` casts with proper types - preprocessing.ts: use exported `HighestPrioritySubscription` type instead of redeclaring via `Awaited<ReturnType<...>>` - deploy/route.ts, status/route.ts: cast `hasWorkflowChanged` args to `WorkflowState` instead of `any` (JSONB + object literal narrowing) - state/route.ts: type block sanitization and save with `BlockState` and `WorkflowState` instead of `any` - search-suggestions.ts: remove 8 unnecessary `as any` casts on `'date'` literal that already satisfies the `Suggestion['category']` union * fix(processing): prevent double-billing race in LoggingSession completion When executeWorkflowCore throws, its catch block fire-and-forgets safeCompleteWithError, then re-throws. The caller's catch block also fire-and-forgets safeCompleteWithError on the same LoggingSession. Both check this.completed (still false) before either's async DB write resolves, so both proceed to completeWorkflowExecution which uses additive SQL for billing — doubling the charged cost on every failed execution. Fix: add a synchronous `completing` flag set immediately before the async work begins. This blocks concurrent callers at the guard check. On failure, the flag is reset so the safe* fallback path (completeWithCostOnlyLog) can still attempt recovery. * fix(processing): unblock error responses and isolate run-count failures Remove unnecessary `await waitForCompletion()` from non-SSE and SSE error paths where no `markAsFailed()` follows — these were blocking error responses on log persistence for no reason. Wrap `updateWorkflowRunCounts` in its own try/catch so a run-count DB failure cannot prevent session completion, billing, and trace span persistence. * improvement(processing): remove dead setupExecutor method The method body was just a debug log with an `any` parameter — logging now works entirely through trace spans with no executor integration. * remove logger.debug * fix(processing): guard completionPromise as write-once (singleton promise) Prevent concurrent safeComplete* calls from overwriting completionPromise with a no-op. The guard now lives at the assignment site — if a completion is already in-flight, return its promise instead of starting a new one. This ensures waitForCompletion() always awaits the real work. * improvement(processing): remove empty else/catch blocks left by debug log cleanup * fix(processing): enforce waitForCompletion inside markAsFailed to prevent completion races Move waitForCompletion() into markAsFailed() so every call site is automatically safe against in-flight fire-and-forget completions. Remove the now-redundant external waitForCompletion() calls in route.ts. * fix(processing): reset completing flag on fallback failure, clean up empty catch - completeWithCostOnlyLog now resets this.completing = false when the fallback itself fails, preventing a permanently stuck session - Use _disconnectError in MCP test-connection to signal intentional ignore * fix(processing): restore disconnect error logging in MCP test-connection Revert unrelated debug log removal — this file isn't part of the processing improvements and the log aids connection leak detection. * fix(processing): address audit findings across branch - preprocessing.ts: use undefined (not null) for failed subscription fetch so getUserUsageLimit does a fresh lookup instead of silently falling back to free-tier limits - deployed/route.ts: log warning on loadDeployedWorkflowState failure instead of silently swallowing the error - schedule-execution.ts: remove dead successLog parameter and all call-site arguments left over from logger.debug cleanup - mcp/middleware.ts: drop unused error binding in empty catch - audit/log.ts, wand.ts: promote logger.debug to logger.warn in catch blocks where these are the only failure signal * revert: undo unnecessary subscription null→undefined change getHighestPrioritySubscription never throws (it catches internally and returns null), so the catch block in preprocessExecution is dead code. The null vs undefined distinction doesn't matter and the coercions added unnecessary complexity. * improvement(processing): remove dead try/catch around getHighestPrioritySubscription getHighestPrioritySubscription catches internally and returns null on error, so the wrapping try/catch was unreachable dead code. * improvement(processing): remove dead getSnapshotByHash method No longer called after createSnapshotWithDeduplication was refactored to use a single upsert instead of select-then-insert. ---------
708 lines
25 KiB
TypeScript
708 lines
25 KiB
TypeScript
import { db } from '@sim/db'
|
|
import { permissions, webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema'
|
|
import { createLogger } from '@sim/logger'
|
|
import { and, desc, eq, inArray, isNull, or } from 'drizzle-orm'
|
|
import { nanoid } from 'nanoid'
|
|
import { type NextRequest, NextResponse } from 'next/server'
|
|
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
|
import { getSession } from '@/lib/auth'
|
|
import { PlatformEvents } from '@/lib/core/telemetry'
|
|
import { generateRequestId } from '@/lib/core/utils/request'
|
|
import { getProviderIdFromServiceId } from '@/lib/oauth'
|
|
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
|
|
import {
|
|
cleanupExternalWebhook,
|
|
createExternalWebhookSubscription,
|
|
} from '@/lib/webhooks/provider-subscriptions'
|
|
import { mergeNonUserFields } from '@/lib/webhooks/utils'
|
|
import {
|
|
configureGmailPolling,
|
|
configureOutlookPolling,
|
|
configureRssPolling,
|
|
syncWebhooksForCredentialSet,
|
|
} from '@/lib/webhooks/utils.server'
|
|
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
|
import { extractCredentialSetId, isCredentialSetValue } from '@/executor/constants'
|
|
|
|
const logger = createLogger('WebhooksAPI')
|
|
|
|
export const dynamic = 'force-dynamic'
|
|
|
|
// Get all webhooks for the current user
|
|
export async function GET(request: NextRequest) {
|
|
const requestId = generateRequestId()
|
|
|
|
try {
|
|
const session = await getSession()
|
|
if (!session?.user?.id) {
|
|
logger.warn(`[${requestId}] Unauthorized webhooks access attempt`)
|
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
|
}
|
|
|
|
const { searchParams } = new URL(request.url)
|
|
const workflowId = searchParams.get('workflowId')
|
|
const blockId = searchParams.get('blockId')
|
|
|
|
if (workflowId && blockId) {
|
|
// Collaborative-aware path: allow collaborators with read access to view webhooks
|
|
// Fetch workflow to verify access
|
|
const wf = await db
|
|
.select({ id: workflow.id, userId: workflow.userId, workspaceId: workflow.workspaceId })
|
|
.from(workflow)
|
|
.where(eq(workflow.id, workflowId))
|
|
.limit(1)
|
|
|
|
if (!wf.length) {
|
|
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
|
|
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
|
|
}
|
|
|
|
const wfRecord = wf[0]
|
|
const authorization = await authorizeWorkflowByWorkspacePermission({
|
|
workflowId: wfRecord.id,
|
|
userId: session.user.id,
|
|
action: 'read',
|
|
})
|
|
const canRead = authorization.allowed
|
|
|
|
if (!canRead) {
|
|
logger.warn(
|
|
`[${requestId}] User ${session.user.id} denied permission to read webhooks for workflow ${workflowId}`
|
|
)
|
|
return NextResponse.json({ webhooks: [] }, { status: 200 })
|
|
}
|
|
|
|
const webhooks = await db
|
|
.select({
|
|
webhook: webhook,
|
|
workflow: {
|
|
id: workflow.id,
|
|
name: workflow.name,
|
|
},
|
|
})
|
|
.from(webhook)
|
|
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
|
|
.leftJoin(
|
|
workflowDeploymentVersion,
|
|
and(
|
|
eq(workflowDeploymentVersion.workflowId, workflow.id),
|
|
eq(workflowDeploymentVersion.isActive, true)
|
|
)
|
|
)
|
|
.where(
|
|
and(
|
|
eq(webhook.workflowId, workflowId),
|
|
eq(webhook.blockId, blockId),
|
|
or(
|
|
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
|
|
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
|
|
)
|
|
)
|
|
)
|
|
.orderBy(desc(webhook.updatedAt))
|
|
|
|
logger.info(
|
|
`[${requestId}] Retrieved ${webhooks.length} webhooks for workflow ${workflowId} block ${blockId}`
|
|
)
|
|
return NextResponse.json({ webhooks }, { status: 200 })
|
|
}
|
|
|
|
if (workflowId && !blockId) {
|
|
// For now, allow the call but return empty results to avoid breaking the UI
|
|
return NextResponse.json({ webhooks: [] }, { status: 200 })
|
|
}
|
|
|
|
const workspacePermissionRows = await db
|
|
.select({ workspaceId: permissions.entityId })
|
|
.from(permissions)
|
|
.where(and(eq(permissions.userId, session.user.id), eq(permissions.entityType, 'workspace')))
|
|
|
|
const workspaceIds = workspacePermissionRows.map((row) => row.workspaceId)
|
|
if (workspaceIds.length === 0) {
|
|
return NextResponse.json({ webhooks: [] }, { status: 200 })
|
|
}
|
|
|
|
const webhooks = await db
|
|
.select({
|
|
webhook: webhook,
|
|
workflow: {
|
|
id: workflow.id,
|
|
name: workflow.name,
|
|
},
|
|
})
|
|
.from(webhook)
|
|
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
|
|
.where(inArray(workflow.workspaceId, workspaceIds))
|
|
|
|
logger.info(`[${requestId}] Retrieved ${webhooks.length} workspace-accessible webhooks`)
|
|
return NextResponse.json({ webhooks }, { status: 200 })
|
|
} catch (error) {
|
|
logger.error(`[${requestId}] Error fetching webhooks`, error)
|
|
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
|
}
|
|
}
|
|
|
|
// Create or Update a webhook
|
|
export async function POST(request: NextRequest) {
|
|
const requestId = generateRequestId()
|
|
const session = await getSession()
|
|
const userId = session?.user?.id
|
|
|
|
if (!userId) {
|
|
logger.warn(`[${requestId}] Unauthorized webhook creation attempt`)
|
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
|
}
|
|
|
|
try {
|
|
const body = await request.json()
|
|
const { workflowId, path, provider, providerConfig, blockId } = body
|
|
|
|
// Validate input
|
|
if (!workflowId) {
|
|
logger.warn(`[${requestId}] Missing required fields for webhook creation`, {
|
|
hasWorkflowId: !!workflowId,
|
|
hasPath: !!path,
|
|
})
|
|
return NextResponse.json({ error: 'Missing required fields' }, { status: 400 })
|
|
}
|
|
|
|
// Determine final path with special handling for credential-based providers
|
|
// to avoid generating a new path on every save.
|
|
let finalPath = path
|
|
const credentialBasedProviders = ['gmail', 'outlook']
|
|
const isCredentialBased = credentialBasedProviders.includes(provider)
|
|
// Treat Microsoft Teams chat subscription as credential-based for path generation purposes
|
|
const isMicrosoftTeamsChatSubscription =
|
|
provider === 'microsoft-teams' &&
|
|
typeof providerConfig === 'object' &&
|
|
providerConfig?.triggerId === 'microsoftteams_chat_subscription'
|
|
|
|
// If path is missing
|
|
if (!finalPath || finalPath.trim() === '') {
|
|
if (isCredentialBased || isMicrosoftTeamsChatSubscription) {
|
|
// Try to reuse existing path for this workflow+block if one exists
|
|
if (blockId) {
|
|
const existingForBlock = await db
|
|
.select({ id: webhook.id, path: webhook.path })
|
|
.from(webhook)
|
|
.leftJoin(
|
|
workflowDeploymentVersion,
|
|
and(
|
|
eq(workflowDeploymentVersion.workflowId, workflowId),
|
|
eq(workflowDeploymentVersion.isActive, true)
|
|
)
|
|
)
|
|
.where(
|
|
and(
|
|
eq(webhook.workflowId, workflowId),
|
|
eq(webhook.blockId, blockId),
|
|
or(
|
|
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
|
|
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
|
|
)
|
|
)
|
|
)
|
|
.limit(1)
|
|
|
|
if (existingForBlock.length > 0) {
|
|
finalPath = existingForBlock[0].path
|
|
logger.info(
|
|
`[${requestId}] Reusing existing generated path for ${provider} trigger: ${finalPath}`
|
|
)
|
|
}
|
|
}
|
|
|
|
// If still no path, generate a new dummy path (first-time save)
|
|
if (!finalPath || finalPath.trim() === '') {
|
|
finalPath = `${provider}-${crypto.randomUUID()}`
|
|
logger.info(`[${requestId}] Generated webhook path for ${provider} trigger: ${finalPath}`)
|
|
}
|
|
} else {
|
|
logger.warn(`[${requestId}] Missing path for webhook creation`, {
|
|
hasWorkflowId: !!workflowId,
|
|
hasPath: !!path,
|
|
})
|
|
return NextResponse.json({ error: 'Missing required path' }, { status: 400 })
|
|
}
|
|
}
|
|
|
|
// Check if the workflow exists and user has permission to modify it
|
|
const workflowData = await db
|
|
.select({
|
|
id: workflow.id,
|
|
userId: workflow.userId,
|
|
workspaceId: workflow.workspaceId,
|
|
})
|
|
.from(workflow)
|
|
.where(eq(workflow.id, workflowId))
|
|
.limit(1)
|
|
|
|
if (workflowData.length === 0) {
|
|
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
|
|
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
|
|
}
|
|
|
|
const workflowRecord = workflowData[0]
|
|
|
|
const authorization = await authorizeWorkflowByWorkspacePermission({
|
|
workflowId,
|
|
userId,
|
|
action: 'write',
|
|
})
|
|
const canModify = authorization.allowed
|
|
|
|
if (!canModify) {
|
|
logger.warn(
|
|
`[${requestId}] User ${userId} denied permission to modify webhook for workflow ${workflowId}`
|
|
)
|
|
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
|
|
}
|
|
|
|
// Determine existing webhook to update (prefer by workflow+block for credential-based providers)
|
|
let targetWebhookId: string | null = null
|
|
if (isCredentialBased && blockId) {
|
|
const existingForBlock = await db
|
|
.select({ id: webhook.id })
|
|
.from(webhook)
|
|
.leftJoin(
|
|
workflowDeploymentVersion,
|
|
and(
|
|
eq(workflowDeploymentVersion.workflowId, workflowId),
|
|
eq(workflowDeploymentVersion.isActive, true)
|
|
)
|
|
)
|
|
.where(
|
|
and(
|
|
eq(webhook.workflowId, workflowId),
|
|
eq(webhook.blockId, blockId),
|
|
or(
|
|
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
|
|
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
|
|
)
|
|
)
|
|
)
|
|
.limit(1)
|
|
if (existingForBlock.length > 0) {
|
|
targetWebhookId = existingForBlock[0].id
|
|
}
|
|
}
|
|
if (!targetWebhookId) {
|
|
const existingByPath = await db
|
|
.select({ id: webhook.id, workflowId: webhook.workflowId })
|
|
.from(webhook)
|
|
.where(eq(webhook.path, finalPath))
|
|
.limit(1)
|
|
if (existingByPath.length > 0) {
|
|
// If a webhook with the same path exists but belongs to a different workflow, return an error
|
|
if (existingByPath[0].workflowId !== workflowId) {
|
|
logger.warn(`[${requestId}] Webhook path conflict: ${finalPath}`)
|
|
return NextResponse.json(
|
|
{ error: 'Webhook path already exists.', code: 'PATH_EXISTS' },
|
|
{ status: 409 }
|
|
)
|
|
}
|
|
targetWebhookId = existingByPath[0].id
|
|
}
|
|
}
|
|
|
|
let savedWebhook: any = null
|
|
const originalProviderConfig = providerConfig || {}
|
|
let resolvedProviderConfig = await resolveEnvVarsInObject(
|
|
originalProviderConfig,
|
|
userId,
|
|
workflowRecord.workspaceId || undefined
|
|
)
|
|
|
|
// --- Credential Set Handling ---
|
|
// For credential sets, we fan out to create one webhook per credential at save time.
|
|
// This applies to all OAuth-based triggers, not just polling ones.
|
|
// Check for credentialSetId directly (frontend may already extract it) or credential set value in credential fields
|
|
const rawCredentialId = (resolvedProviderConfig?.credentialId ||
|
|
resolvedProviderConfig?.triggerCredentials) as string | undefined
|
|
const directCredentialSetId = resolvedProviderConfig?.credentialSetId as string | undefined
|
|
|
|
if (directCredentialSetId || rawCredentialId) {
|
|
const credentialSetId =
|
|
directCredentialSetId ||
|
|
(rawCredentialId && isCredentialSetValue(rawCredentialId)
|
|
? extractCredentialSetId(rawCredentialId)
|
|
: null)
|
|
|
|
if (credentialSetId) {
|
|
logger.info(
|
|
`[${requestId}] Credential set detected for ${provider} trigger. Syncing webhooks for set ${credentialSetId}`
|
|
)
|
|
|
|
const oauthProviderId = getProviderIdFromServiceId(provider)
|
|
|
|
const {
|
|
credentialId: _cId,
|
|
triggerCredentials: _tCred,
|
|
credentialSetId: _csId,
|
|
...baseProviderConfig
|
|
} = resolvedProviderConfig
|
|
|
|
try {
|
|
const syncResult = await syncWebhooksForCredentialSet({
|
|
workflowId,
|
|
blockId,
|
|
provider,
|
|
basePath: finalPath,
|
|
credentialSetId,
|
|
oauthProviderId,
|
|
providerConfig: baseProviderConfig,
|
|
requestId,
|
|
})
|
|
|
|
if (syncResult.webhooks.length === 0) {
|
|
logger.error(
|
|
`[${requestId}] No webhooks created for credential set - no valid credentials found`
|
|
)
|
|
return NextResponse.json(
|
|
{
|
|
error: `No valid credentials found in credential set for ${provider}`,
|
|
details: 'Please ensure team members have connected their accounts',
|
|
},
|
|
{ status: 400 }
|
|
)
|
|
}
|
|
|
|
// Configure each new webhook (for providers that need configuration)
|
|
const pollingProviders = ['gmail', 'outlook']
|
|
const needsConfiguration = pollingProviders.includes(provider)
|
|
|
|
if (needsConfiguration) {
|
|
const configureFunc =
|
|
provider === 'gmail' ? configureGmailPolling : configureOutlookPolling
|
|
const configureErrors: string[] = []
|
|
|
|
for (const wh of syncResult.webhooks) {
|
|
if (wh.isNew) {
|
|
// Fetch the webhook data for configuration
|
|
const webhookRows = await db
|
|
.select()
|
|
.from(webhook)
|
|
.where(eq(webhook.id, wh.id))
|
|
.limit(1)
|
|
|
|
if (webhookRows.length > 0) {
|
|
const success = await configureFunc(webhookRows[0], requestId)
|
|
if (!success) {
|
|
configureErrors.push(
|
|
`Failed to configure webhook for credential ${wh.credentialId}`
|
|
)
|
|
logger.warn(
|
|
`[${requestId}] Failed to configure ${provider} polling for webhook ${wh.id}`
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (
|
|
configureErrors.length > 0 &&
|
|
configureErrors.length === syncResult.webhooks.length
|
|
) {
|
|
// All configurations failed - roll back
|
|
logger.error(`[${requestId}] All webhook configurations failed, rolling back`)
|
|
for (const wh of syncResult.webhooks) {
|
|
await db.delete(webhook).where(eq(webhook.id, wh.id))
|
|
}
|
|
return NextResponse.json(
|
|
{
|
|
error: `Failed to configure ${provider} polling`,
|
|
details: 'Please check account permissions and try again',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
}
|
|
|
|
logger.info(
|
|
`[${requestId}] Successfully synced ${syncResult.webhooks.length} webhooks for credential set ${credentialSetId}`
|
|
)
|
|
|
|
// Return the first webhook as the "primary" for the UI
|
|
// The UI will query by credentialSetId to get all of them
|
|
const primaryWebhookRows = await db
|
|
.select()
|
|
.from(webhook)
|
|
.where(eq(webhook.id, syncResult.webhooks[0].id))
|
|
.limit(1)
|
|
|
|
return NextResponse.json(
|
|
{
|
|
webhook: primaryWebhookRows[0],
|
|
credentialSetInfo: {
|
|
credentialSetId,
|
|
totalWebhooks: syncResult.webhooks.length,
|
|
created: syncResult.created,
|
|
updated: syncResult.updated,
|
|
deleted: syncResult.deleted,
|
|
},
|
|
},
|
|
{ status: syncResult.created > 0 ? 201 : 200 }
|
|
)
|
|
} catch (err) {
|
|
logger.error(`[${requestId}] Error syncing webhooks for credential set`, err)
|
|
return NextResponse.json(
|
|
{
|
|
error: `Failed to configure ${provider} webhook`,
|
|
details: err instanceof Error ? err.message : 'Unknown error',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
}
|
|
}
|
|
// --- End Credential Set Handling ---
|
|
|
|
let externalSubscriptionCreated = false
|
|
const createTempWebhookData = (providerConfigOverride = resolvedProviderConfig) => ({
|
|
id: targetWebhookId || nanoid(),
|
|
path: finalPath,
|
|
provider,
|
|
providerConfig: providerConfigOverride,
|
|
})
|
|
|
|
const userProvided = originalProviderConfig as Record<string, unknown>
|
|
const configToSave: Record<string, unknown> = { ...userProvided }
|
|
|
|
try {
|
|
const result = await createExternalWebhookSubscription(
|
|
request,
|
|
createTempWebhookData(),
|
|
workflowRecord,
|
|
userId,
|
|
requestId
|
|
)
|
|
const updatedConfig = result.updatedProviderConfig as Record<string, unknown>
|
|
mergeNonUserFields(configToSave, updatedConfig, userProvided)
|
|
resolvedProviderConfig = updatedConfig
|
|
externalSubscriptionCreated = result.externalSubscriptionCreated
|
|
} catch (err) {
|
|
logger.error(`[${requestId}] Error creating external webhook subscription`, err)
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to create external webhook subscription',
|
|
details: err instanceof Error ? err.message : 'Unknown error',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
|
|
try {
|
|
if (targetWebhookId) {
|
|
logger.info(`[${requestId}] Updating existing webhook for path: ${finalPath}`, {
|
|
webhookId: targetWebhookId,
|
|
provider,
|
|
hasCredentialId: !!(configToSave as any)?.credentialId,
|
|
credentialId: (configToSave as any)?.credentialId,
|
|
})
|
|
const updatedResult = await db
|
|
.update(webhook)
|
|
.set({
|
|
blockId,
|
|
provider,
|
|
providerConfig: configToSave,
|
|
credentialSetId:
|
|
((configToSave as Record<string, unknown>)?.credentialSetId as string | null) || null,
|
|
isActive: true,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(webhook.id, targetWebhookId))
|
|
.returning()
|
|
savedWebhook = updatedResult[0]
|
|
logger.info(`[${requestId}] Webhook updated successfully`, {
|
|
webhookId: savedWebhook.id,
|
|
savedProviderConfig: savedWebhook.providerConfig,
|
|
})
|
|
} else {
|
|
// Create a new webhook
|
|
const webhookId = nanoid()
|
|
logger.info(`[${requestId}] Creating new webhook with ID: ${webhookId}`)
|
|
const newResult = await db
|
|
.insert(webhook)
|
|
.values({
|
|
id: webhookId,
|
|
workflowId,
|
|
blockId,
|
|
path: finalPath,
|
|
provider,
|
|
providerConfig: configToSave,
|
|
credentialSetId:
|
|
((configToSave as Record<string, unknown>)?.credentialSetId as string | null) || null,
|
|
isActive: true,
|
|
createdAt: new Date(),
|
|
updatedAt: new Date(),
|
|
})
|
|
.returning()
|
|
savedWebhook = newResult[0]
|
|
}
|
|
} catch (dbError) {
|
|
if (externalSubscriptionCreated) {
|
|
logger.error(`[${requestId}] DB save failed, cleaning up external subscription`, dbError)
|
|
try {
|
|
await cleanupExternalWebhook(
|
|
createTempWebhookData(configToSave),
|
|
workflowRecord,
|
|
requestId
|
|
)
|
|
} catch (cleanupError) {
|
|
logger.error(
|
|
`[${requestId}] Failed to cleanup external subscription after DB save failure`,
|
|
cleanupError
|
|
)
|
|
}
|
|
}
|
|
throw dbError
|
|
}
|
|
|
|
// --- Gmail/Outlook webhook setup (these don't require external subscriptions, configure after DB save) ---
|
|
if (savedWebhook && provider === 'gmail') {
|
|
logger.info(`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`)
|
|
try {
|
|
const success = await configureGmailPolling(savedWebhook, requestId)
|
|
|
|
if (!success) {
|
|
logger.error(`[${requestId}] Failed to configure Gmail polling, rolling back webhook`)
|
|
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to configure Gmail polling',
|
|
details: 'Please check your Gmail account permissions and try again',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
|
|
logger.info(`[${requestId}] Successfully configured Gmail polling`)
|
|
} catch (err) {
|
|
logger.error(
|
|
`[${requestId}] Error setting up Gmail webhook configuration, rolling back webhook`,
|
|
err
|
|
)
|
|
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to configure Gmail webhook',
|
|
details: err instanceof Error ? err.message : 'Unknown error',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
}
|
|
// --- End Gmail specific logic ---
|
|
|
|
// --- Outlook webhook setup ---
|
|
if (savedWebhook && provider === 'outlook') {
|
|
logger.info(
|
|
`[${requestId}] Outlook provider detected. Setting up Outlook webhook configuration.`
|
|
)
|
|
try {
|
|
const success = await configureOutlookPolling(savedWebhook, requestId)
|
|
|
|
if (!success) {
|
|
logger.error(`[${requestId}] Failed to configure Outlook polling, rolling back webhook`)
|
|
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to configure Outlook polling',
|
|
details: 'Please check your Outlook account permissions and try again',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
|
|
logger.info(`[${requestId}] Successfully configured Outlook polling`)
|
|
} catch (err) {
|
|
logger.error(
|
|
`[${requestId}] Error setting up Outlook webhook configuration, rolling back webhook`,
|
|
err
|
|
)
|
|
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to configure Outlook webhook',
|
|
details: err instanceof Error ? err.message : 'Unknown error',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
}
|
|
// --- End Outlook specific logic ---
|
|
|
|
// --- RSS webhook setup ---
|
|
if (savedWebhook && provider === 'rss') {
|
|
logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`)
|
|
try {
|
|
const success = await configureRssPolling(savedWebhook, requestId)
|
|
|
|
if (!success) {
|
|
logger.error(`[${requestId}] Failed to configure RSS polling, rolling back webhook`)
|
|
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to configure RSS polling',
|
|
details: 'Please try again',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
|
|
logger.info(`[${requestId}] Successfully configured RSS polling`)
|
|
} catch (err) {
|
|
logger.error(
|
|
`[${requestId}] Error setting up RSS webhook configuration, rolling back webhook`,
|
|
err
|
|
)
|
|
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
|
return NextResponse.json(
|
|
{
|
|
error: 'Failed to configure RSS webhook',
|
|
details: err instanceof Error ? err.message : 'Unknown error',
|
|
},
|
|
{ status: 500 }
|
|
)
|
|
}
|
|
}
|
|
// --- End RSS specific logic ---
|
|
|
|
if (!targetWebhookId && savedWebhook) {
|
|
try {
|
|
PlatformEvents.webhookCreated({
|
|
webhookId: savedWebhook.id,
|
|
workflowId: workflowId,
|
|
provider: provider || 'generic',
|
|
workspaceId: workflowRecord.workspaceId || undefined,
|
|
})
|
|
} catch {
|
|
// Telemetry should not fail the operation
|
|
}
|
|
|
|
recordAudit({
|
|
workspaceId: workflowRecord.workspaceId || null,
|
|
actorId: userId,
|
|
actorName: session?.user?.name ?? undefined,
|
|
actorEmail: session?.user?.email ?? undefined,
|
|
action: AuditAction.WEBHOOK_CREATED,
|
|
resourceType: AuditResourceType.WEBHOOK,
|
|
resourceId: savedWebhook.id,
|
|
resourceName: provider || 'generic',
|
|
description: `Created ${provider || 'generic'} webhook`,
|
|
metadata: { provider, workflowId },
|
|
request,
|
|
})
|
|
}
|
|
|
|
const status = targetWebhookId ? 200 : 201
|
|
return NextResponse.json({ webhook: savedWebhook }, { status })
|
|
} catch (error: any) {
|
|
logger.error(`[${requestId}] Error creating/updating webhook`, {
|
|
message: error.message,
|
|
stack: error.stack,
|
|
})
|
|
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
|
}
|
|
}
|