Compare commits

..

1 Commits

Author SHA1 Message Date
Waleed
7f1efcc798 fix(blocks): resolve Ollama models incorrectly requiring API key in Docker (#3976)
* fix(blocks): resolve Ollama models incorrectly requiring API key in Docker

Server-side validation failed for Ollama models like mistral:latest because
the Zustand providers store is empty on the server and getProviderFromModel
misidentified them via regex pattern matching (e.g. mistral:latest matched
Mistral AI's /^mistral/ pattern).

Replace the hardcoded CLOUD_PROVIDER_PREFIXES list with existing data sources:
- Provider store (definitive on client, checks all provider buckets)
- getBaseModelProviders() from PROVIDER_DEFINITIONS (server-side static cloud model lookup)
- Slash convention for dynamic cloud providers (fireworks/, openrouter/, etc.)
- isOllamaConfigured feature flag using existing OLLAMA_URL env var

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: remove getProviderFromModel regex fallback from API key validation

The fallback was the last piece of regex-based matching in the function and
only ran for self-hosted without OLLAMA_URL on the server — a path where
Ollama models cannot appear in the dropdown anyway.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* lint

* fix: handle vLLM models in store provider check

vLLM is a local model server like Ollama and should not require an API key.
Add vllm to the store provider check as a safety net for models that may
not have the vllm/ prefix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-05 12:14:52 -07:00
50 changed files with 6715 additions and 6668 deletions

View File

@@ -275,15 +275,13 @@ export const {Service}Block: BlockConfig = {
If the service's API supports programmatic webhook creation, implement automatic webhook registration instead of requiring users to manually configure webhooks. This provides a much better user experience.
All subscription lifecycle logic lives on the provider handler — **no code touches `route.ts` or `provider-subscriptions.ts`**.
### When to Use Automatic Registration
Check the service's API documentation for endpoints like:
- `POST /webhooks` or `POST /hooks` - Create webhook
- `DELETE /webhooks/{id}` - Delete webhook
Services that support this pattern include: Grain, Lemlist, Calendly, Airtable, Webflow, Typeform, Ashby, Attio, etc.
Services that support this pattern include: Grain, Lemlist, Calendly, Airtable, Webflow, Typeform, etc.
### Implementation Steps
@@ -339,145 +337,188 @@ export function {service}SetupInstructions(eventType: string): string {
}
```
#### 3. Add `createSubscription` and `deleteSubscription` to the Provider Handler
#### 3. Add Webhook Creation to API Route
In `apps/sim/lib/webhooks/providers/{service}.ts`, add both lifecycle methods to your handler. The orchestration layer (`provider-subscriptions.ts`, `deploy.ts`, `route.ts`) calls these automatically — you never touch those files.
In `apps/sim/app/api/webhooks/route.ts`, add provider-specific logic after the database save:
```typescript
import { createLogger } from '@sim/logger'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
// --- {Service} specific logic ---
if (savedWebhook && provider === '{service}') {
logger.info(`[${requestId}] {Service} provider detected. Creating webhook subscription.`)
try {
const result = await create{Service}WebhookSubscription(
{
id: savedWebhook.id,
path: savedWebhook.path,
providerConfig: savedWebhook.providerConfig,
},
requestId
)
const logger = createLogger('WebhookProvider:{Service}')
export const {service}Handler: WebhookProviderHandler = {
// ... other methods (verifyAuth, formatInput, etc.) ...
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
try {
const providerConfig = getProviderConfig(ctx.webhook)
const apiKey = providerConfig.apiKey as string | undefined
const triggerId = providerConfig.triggerId as string | undefined
if (!apiKey) {
throw new Error('{Service} API Key is required.')
if (result) {
// Update the webhook record with the external webhook ID
const updatedConfig = {
...(savedWebhook.providerConfig as Record<string, any>),
externalId: result.id,
}
await db
.update(webhook)
.set({
providerConfig: updatedConfig,
updatedAt: new Date(),
})
.where(eq(webhook.id, savedWebhook.id))
// Map trigger IDs to service event types
const eventTypeMap: Record<string, string | undefined> = {
{service}_event_a: 'eventA',
{service}_event_b: 'eventB',
{service}_webhook: undefined, // Generic - no filter
}
const eventType = eventTypeMap[triggerId ?? '']
const notificationUrl = getNotificationUrl(ctx.webhook)
const requestBody: Record<string, unknown> = {
url: notificationUrl,
}
if (eventType) {
requestBody.eventType = eventType
}
const response = await fetch('https://api.{service}.com/webhooks', {
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
savedWebhook.providerConfig = updatedConfig
logger.info(`[${requestId}] Successfully created {Service} webhook`, {
externalHookId: result.id,
webhookId: savedWebhook.id,
})
const responseBody = (await response.json()) as Record<string, unknown>
if (!response.ok) {
const errorMessage = (responseBody.message as string) || 'Unknown API error'
let userFriendlyMessage = 'Failed to create webhook in {Service}'
if (response.status === 401) {
userFriendlyMessage = 'Invalid API Key. Please verify and try again.'
} else if (errorMessage) {
userFriendlyMessage = `{Service} error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
const externalId = responseBody.id as string | undefined
if (!externalId) {
throw new Error('{Service} webhook created but no ID was returned.')
}
logger.info(`[${ctx.requestId}] Created {Service} webhook ${externalId}`)
return { providerConfigUpdates: { externalId } }
} catch (error: unknown) {
const err = error as Error
logger.error(`[${ctx.requestId}] {Service} webhook creation failed`, {
message: err.message,
})
throw error
}
},
} catch (err) {
logger.error(
`[${requestId}] Error creating {Service} webhook subscription, rolling back webhook`,
err
)
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
return NextResponse.json(
{
error: 'Failed to create webhook in {Service}',
details: err instanceof Error ? err.message : 'Unknown error',
},
{ status: 500 }
)
}
}
// --- End {Service} specific logic ---
```
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(ctx.webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
Then add the helper function at the end of the file:
if (!apiKey || !externalId) {
logger.warn(`[${ctx.requestId}] Missing apiKey or externalId, skipping cleanup`)
return
}
```typescript
async function create{Service}WebhookSubscription(
webhookData: any,
requestId: string
): Promise<{ id: string } | undefined> {
try {
const { path, providerConfig } = webhookData
const { apiKey, triggerId, projectId } = providerConfig || {}
const response = await fetch(`https://api.{service}.com/webhooks/${externalId}`, {
method: 'DELETE',
headers: { Authorization: `Bearer ${apiKey}` },
})
if (!response.ok && response.status !== 404) {
logger.warn(
`[${ctx.requestId}] Failed to delete {Service} webhook (non-fatal): ${response.status}`
)
} else {
logger.info(`[${ctx.requestId}] Successfully deleted {Service} webhook ${externalId}`)
}
} catch (error) {
logger.warn(`[${ctx.requestId}] Error deleting {Service} webhook (non-fatal)`, error)
if (!apiKey) {
throw new Error('{Service} API Key is required.')
}
},
// Map trigger IDs to service event types
const eventTypeMap: Record<string, string | undefined> = {
{service}_event_a: 'eventA',
{service}_event_b: 'eventB',
{service}_webhook: undefined, // Generic - no filter
}
const eventType = eventTypeMap[triggerId]
const notificationUrl = `${getBaseUrl()}/api/webhooks/trigger/${path}`
const requestBody: Record<string, any> = {
url: notificationUrl,
}
if (eventType) {
requestBody.eventType = eventType
}
if (projectId) {
requestBody.projectId = projectId
}
const response = await fetch('https://api.{service}.com/webhooks', {
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = await response.json()
if (!response.ok) {
const errorMessage = responseBody.message || 'Unknown API error'
let userFriendlyMessage = 'Failed to create webhook in {Service}'
if (response.status === 401) {
userFriendlyMessage = 'Invalid API Key. Please verify and try again.'
} else if (errorMessage) {
userFriendlyMessage = `{Service} error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
return { id: responseBody.id }
} catch (error: any) {
logger.error(`Exception during {Service} webhook creation`, { error: error.message })
throw error
}
}
```
#### How It Works
#### 4. Add Webhook Deletion to Provider Subscriptions
The orchestration layer handles everything automatically:
In `apps/sim/lib/webhooks/provider-subscriptions.ts`:
1. **Creation**: `provider-subscriptions.ts``createExternalWebhookSubscription()` calls `handler.createSubscription()` → merges `providerConfigUpdates` into the saved webhook record.
2. **Deletion**: `provider-subscriptions.ts``cleanupExternalWebhook()` calls `handler.deleteSubscription()` → errors are caught and logged non-fatally.
3. **Polling config**: `deploy.ts``configurePollingIfNeeded()` calls `handler.configurePolling()` for credential-based providers (Gmail, Outlook, RSS, IMAP).
1. Add a logger:
```typescript
const {service}Logger = createLogger('{Service}Webhook')
```
You do NOT need to modify any orchestration files. Just implement the methods on your handler.
2. Add the delete function:
```typescript
export async function delete{Service}Webhook(webhook: any, requestId: string): Promise<void> {
try {
const config = getProviderConfig(webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
#### Shared Utilities for Subscriptions
if (!apiKey || !externalId) {
{service}Logger.warn(`[${requestId}] Missing apiKey or externalId, skipping cleanup`)
return
}
Import from `@/lib/webhooks/providers/subscription-utils`:
const response = await fetch(`https://api.{service}.com/webhooks/${externalId}`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${apiKey}`,
},
})
- `getProviderConfig(webhook)` — safely extract `providerConfig` as `Record<string, unknown>`
- `getNotificationUrl(webhook)` — build the full callback URL: `{baseUrl}/api/webhooks/trigger/{path}`
- `getCredentialOwner(credentialId, requestId)` — resolve OAuth credential to `{ userId, accountId }` (for OAuth-based providers like Airtable, Attio)
if (!response.ok && response.status !== 404) {
{service}Logger.warn(`[${requestId}] Failed to delete webhook (non-fatal): ${response.status}`)
} else {
{service}Logger.info(`[${requestId}] Successfully deleted webhook ${externalId}`)
}
} catch (error) {
{service}Logger.warn(`[${requestId}] Error deleting webhook (non-fatal)`, error)
}
}
```
3. Add to `cleanupExternalWebhook`:
```typescript
export async function cleanupExternalWebhook(...): Promise<void> {
// ... existing providers ...
} else if (webhook.provider === '{service}') {
await delete{Service}Webhook(webhook, requestId)
}
}
```
### Key Points for Automatic Registration
- **API Key visibility**: Always use `password: true` for API key fields
- **Error handling**: Throw from `createSubscription` — the orchestration layer catches it, rolls back the DB webhook, and returns a 500
- **External ID storage**: Return `{ providerConfigUpdates: { externalId } }` — the orchestration layer merges it into `providerConfig`
- **Graceful cleanup**: In `deleteSubscription`, catch errors and log non-fatally (never throw)
- **User-friendly errors**: Map HTTP status codes to helpful error messages in `createSubscription`
- **Error handling**: Roll back the database webhook if external creation fails
- **External ID storage**: Save the external webhook ID in `providerConfig.externalId`
- **Graceful cleanup**: Don't fail webhook deletion if cleanup fails (use non-fatal logging)
- **User-friendly errors**: Map HTTP status codes to helpful error messages
## The buildTriggerSubBlocks Helper
@@ -511,148 +552,6 @@ All fields automatically have:
- `mode: 'trigger'` - Only shown in trigger mode
- `condition: { field: 'selectedTriggerId', value: triggerId }` - Only shown when this trigger is selected
## Webhook Provider Handler (Optional)
If the service requires **custom webhook auth** (HMAC signatures, token validation), **event matching** (filtering by trigger type), **idempotency dedup**, **custom input formatting**, or **subscription lifecycle** — all of this lives in a single provider handler file.
### Directory
```
apps/sim/lib/webhooks/providers/
├── types.ts # WebhookProviderHandler interface (16 optional methods)
├── utils.ts # Shared helpers (createHmacVerifier, verifyTokenAuth, skipByEventTypes)
├── subscription-utils.ts # Shared subscription helpers (getProviderConfig, getNotificationUrl, getCredentialOwner)
├── registry.ts # Handler map + default handler
├── index.ts # Barrel export
└── {service}.ts # Your provider handler (ALL provider-specific logic here)
```
### When to Create a Handler
| Behavior | Method to implement | Example providers |
|---|---|---|
| HMAC signature auth | `verifyAuth` via `createHmacVerifier` | Ashby, Jira, Linear, Typeform |
| Custom token auth | `verifyAuth` via `verifyTokenAuth` | Generic, Google Forms |
| Event type filtering | `matchEvent` | GitHub, Jira, Confluence, Attio, HubSpot |
| Event skip by type list | `shouldSkipEvent` via `skipByEventTypes` | Stripe, Grain |
| Idempotency dedup | `extractIdempotencyId` | Slack, Stripe, Linear, Jira |
| Custom success response | `formatSuccessResponse` | Slack, Twilio Voice, Microsoft Teams |
| Custom error format | `formatErrorResponse` | Microsoft Teams |
| Custom input formatting | `formatInput` | Slack, Teams, Attio, Ashby, Gmail, Outlook |
| Auto webhook creation | `createSubscription` | Ashby, Grain, Calendly, Airtable, Typeform |
| Auto webhook deletion | `deleteSubscription` | Ashby, Grain, Calendly, Airtable, Typeform |
| Polling setup | `configurePolling` | Gmail, Outlook, RSS, IMAP |
| Challenge/verification | `handleChallenge` | Slack, WhatsApp, Microsoft Teams |
If none of these apply, you do NOT need a handler file. The default handler provides bearer token auth for providers that set `providerConfig.token`.
### Simple Example: HMAC Auth Only
Signature validators are defined as private functions **inside the handler file** (not in a shared utils file):
```typescript
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type { WebhookProviderHandler } from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:{Service}')
function validate{Service}Signature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) return false
if (!signature.startsWith('sha256=')) return false
const provided = signature.substring(7)
const computed = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
return safeCompare(computed, provided)
} catch (error) {
logger.error('Error validating {Service} signature:', error)
return false
}
}
export const {service}Handler: WebhookProviderHandler = {
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'X-{Service}-Signature',
validateFn: validate{Service}Signature,
providerLabel: '{Service}',
}),
}
```
### Example: Auth + Event Matching + Idempotency
```typescript
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type { EventMatchContext, WebhookProviderHandler } from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:{Service}')
function validate{Service}Signature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) return false
const computed = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
return safeCompare(computed, signature)
} catch (error) {
logger.error('Error validating {Service} signature:', error)
return false
}
}
export const {service}Handler: WebhookProviderHandler = {
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'X-{Service}-Signature',
validateFn: validate{Service}Signature,
providerLabel: '{Service}',
}),
async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined
const obj = body as Record<string, unknown>
if (triggerId && triggerId !== '{service}_webhook') {
const { is{Service}EventMatch } = await import('@/triggers/{service}/utils')
if (!is{Service}EventMatch(triggerId, obj)) {
logger.debug(
`[${requestId}] {Service} event mismatch for trigger ${triggerId}. Skipping.`,
{ webhookId: webhook.id, workflowId: workflow.id, triggerId }
)
return false
}
}
return true
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
if (obj.id && obj.type) {
return `${obj.type}:${obj.id}`
}
return null
},
}
```
### Registering the Handler
In `apps/sim/lib/webhooks/providers/registry.ts`:
```typescript
import { {service}Handler } from '@/lib/webhooks/providers/{service}'
const PROVIDER_HANDLERS: Record<string, WebhookProviderHandler> = {
// ... existing providers (alphabetical) ...
{service}: {service}Handler,
}
```
## Trigger Outputs & Webhook Input Formatting
### Important: Two Sources of Truth
@@ -660,48 +559,35 @@ const PROVIDER_HANDLERS: Record<string, WebhookProviderHandler> = {
There are two related but separate concerns:
1. **Trigger `outputs`** - Schema/contract defining what fields SHOULD be available. Used by UI for tag dropdown.
2. **`formatInput` on the handler** - Implementation that transforms raw webhook payload into actual data. Defined in `apps/sim/lib/webhooks/providers/{service}.ts`.
2. **`formatWebhookInput`** - Implementation that transforms raw webhook payload into actual data. Located in `apps/sim/lib/webhooks/utils.server.ts`.
**These MUST be aligned.** The fields returned by `formatInput` should match what's defined in trigger `outputs`. If they differ:
**These MUST be aligned.** The fields returned by `formatWebhookInput` should match what's defined in trigger `outputs`. If they differ:
- Tag dropdown shows fields that don't exist (broken variable resolution)
- Or actual data has fields not shown in dropdown (users can't discover them)
### When to Add `formatInput`
### When to Add a formatWebhookInput Handler
- **Simple providers**: If the raw webhook payload structure already matches your outputs, you don't need it. The fallback passes through the raw body directly.
- **Complex providers**: If you need to transform, flatten, extract nested data, compute fields, or handle conditional logic, add `formatInput` to your handler.
- **Simple providers**: If the raw webhook payload structure already matches your outputs, you don't need a handler. The generic fallback returns `body` directly.
- **Complex providers**: If you need to transform, flatten, extract nested data, compute fields, or handle conditional logic, add a handler.
### Adding `formatInput` to Your Handler
### Adding a Handler
In `apps/sim/lib/webhooks/providers/{service}.ts`:
In `apps/sim/lib/webhooks/utils.server.ts`, add a handler block:
```typescript
import type {
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
export const {service}Handler: WebhookProviderHandler = {
// ... other methods ...
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
eventType: b.type,
resourceId: (b.data as Record<string, unknown>)?.id || '',
timestamp: b.created_at,
resource: b.data,
},
}
},
if (foundWebhook.provider === '{service}') {
// Transform raw webhook body to match trigger outputs
return {
eventType: body.type,
resourceId: body.data?.id || '',
timestamp: body.created_at,
resource: body.data,
}
}
```
**Key rules:**
- Return `{ input: { ... } }` where the inner object matches your trigger `outputs` definition exactly
- Return `{ input: ..., skip: { message: '...' } }` to skip execution for this event
- Return fields that match your trigger `outputs` definition exactly
- No wrapper objects like `webhook: { data: ... }` or `{service}: { ... }`
- No duplication (don't spread body AND add individual fields)
- Use `null` for missing optional data, not empty objects with empty strings
@@ -802,25 +688,21 @@ export const {service}WebhookTrigger: TriggerConfig = {
- [ ] Block has all trigger IDs in `triggers.available`
- [ ] Block spreads all trigger subBlocks: `...getTrigger('id').subBlocks`
### Webhook Provider Handler (`providers/{service}.ts`)
- [ ] Created handler file in `apps/sim/lib/webhooks/providers/{service}.ts`
- [ ] Registered handler in `apps/sim/lib/webhooks/providers/registry.ts` (alphabetical)
- [ ] Signature validator defined as private function inside handler file (not in a shared file)
- [ ] Used `createHmacVerifier` from `providers/utils` for HMAC-based auth
- [ ] Used `verifyTokenAuth` from `providers/utils` for token-based auth
- [ ] Event matching uses dynamic `await import()` for trigger utils
- [ ] Added `formatInput` if webhook payload needs transformation (returns `{ input: ... }`)
### Automatic Webhook Registration (if supported)
- [ ] Added API key field to `build{Service}ExtraFields` with `password: true`
- [ ] Updated setup instructions for automatic webhook creation
- [ ] Added `createSubscription` method to handler (uses `getNotificationUrl`, `getProviderConfig` from `subscription-utils`)
- [ ] Added `deleteSubscription` method to handler (catches errors, logs non-fatally)
- [ ] NO changes needed to `route.ts`, `provider-subscriptions.ts`, or `deploy.ts`
- [ ] Added provider-specific logic to `apps/sim/app/api/webhooks/route.ts`
- [ ] Added `create{Service}WebhookSubscription` helper function
- [ ] Added `delete{Service}Webhook` function to `provider-subscriptions.ts`
- [ ] Added provider to `cleanupExternalWebhook` function
### Webhook Input Formatting
- [ ] Added handler in `apps/sim/lib/webhooks/utils.server.ts` (if custom formatting needed)
- [ ] Handler returns fields matching trigger `outputs` exactly
- [ ] Run `bunx scripts/check-trigger-alignment.ts {service}` to verify alignment
### Testing
- [ ] Run `bun run type-check` to verify no TypeScript errors
- [ ] Run `bunx scripts/check-trigger-alignment.ts {service}` to verify output alignment
- [ ] Restart dev server to pick up new triggers
- [ ] Test trigger UI shows correctly in the block
- [ ] Test automatic webhook creation works (if applicable)

View File

@@ -16,9 +16,13 @@ import {
createExternalWebhookSubscription,
shouldRecreateExternalWebhookSubscription,
} from '@/lib/webhooks/provider-subscriptions'
import { getProviderHandler } from '@/lib/webhooks/providers'
import { mergeNonUserFields } from '@/lib/webhooks/utils'
import { syncWebhooksForCredentialSet } from '@/lib/webhooks/utils.server'
import {
configureGmailPolling,
configureOutlookPolling,
configureRssPolling,
syncWebhooksForCredentialSet,
} from '@/lib/webhooks/utils.server'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { extractCredentialSetId, isCredentialSetValue } from '@/executor/constants'
@@ -344,6 +348,7 @@ export async function POST(request: NextRequest) {
workflowRecord.workspaceId || undefined
)
// --- Credential Set Handling ---
// For credential sets, we fan out to create one webhook per credential at save time.
// This applies to all OAuth-based triggers, not just polling ones.
// Check for credentialSetId directly (frontend may already extract it) or credential set value in credential fields
@@ -397,13 +402,16 @@ export async function POST(request: NextRequest) {
)
}
const providerHandler = getProviderHandler(provider)
const needsConfiguration = provider === 'gmail' || provider === 'outlook'
if (providerHandler.configurePolling) {
if (needsConfiguration) {
const configureFunc =
provider === 'gmail' ? configureGmailPolling : configureOutlookPolling
const configureErrors: string[] = []
for (const wh of syncResult.webhooks) {
if (wh.isNew) {
// Fetch the webhook data for configuration
const webhookRows = await db
.select()
.from(webhook)
@@ -411,10 +419,7 @@ export async function POST(request: NextRequest) {
.limit(1)
if (webhookRows.length > 0) {
const success = await providerHandler.configurePolling({
webhook: webhookRows[0],
requestId,
})
const success = await configureFunc(webhookRows[0], requestId)
if (!success) {
configureErrors.push(
`Failed to configure webhook for credential ${wh.credentialId}`
@@ -431,6 +436,7 @@ export async function POST(request: NextRequest) {
configureErrors.length > 0 &&
configureErrors.length === syncResult.webhooks.length
) {
// All configurations failed - roll back
logger.error(`[${requestId}] All webhook configurations failed, rolling back`)
for (const wh of syncResult.webhooks) {
await db.delete(webhook).where(eq(webhook.id, wh.id))
@@ -482,6 +488,8 @@ export async function POST(request: NextRequest) {
}
}
}
// --- End Credential Set Handling ---
let externalSubscriptionCreated = false
const createTempWebhookData = (providerConfigOverride = resolvedProviderConfig) => ({
id: targetWebhookId || generateShortId(),
@@ -621,49 +629,115 @@ export async function POST(request: NextRequest) {
}
}
if (savedWebhook) {
const pollingHandler = getProviderHandler(provider)
if (pollingHandler.configurePolling) {
logger.info(
`[${requestId}] ${provider} provider detected. Setting up polling configuration.`
)
try {
const success = await pollingHandler.configurePolling({
webhook: savedWebhook,
requestId,
})
// --- Gmail/Outlook webhook setup (these don't require external subscriptions, configure after DB save) ---
if (savedWebhook && provider === 'gmail') {
logger.info(`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`)
try {
const success = await configureGmailPolling(savedWebhook, requestId)
if (!success) {
logger.error(
`[${requestId}] Failed to configure ${provider} polling, rolling back webhook`
)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: `Failed to configure ${provider} polling`,
details: 'Please check your account permissions and try again',
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Successfully configured ${provider} polling`)
} catch (err) {
logger.error(
`[${requestId}] Error setting up ${provider} webhook configuration, rolling back webhook`,
err
)
if (!success) {
logger.error(`[${requestId}] Failed to configure Gmail polling, rolling back webhook`)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: `Failed to configure ${provider} webhook`,
details: err instanceof Error ? err.message : 'Unknown error',
error: 'Failed to configure Gmail polling',
details: 'Please check your Gmail account permissions and try again',
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Successfully configured Gmail polling`)
} catch (err) {
logger.error(
`[${requestId}] Error setting up Gmail webhook configuration, rolling back webhook`,
err
)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: 'Failed to configure Gmail webhook',
details: err instanceof Error ? err.message : 'Unknown error',
},
{ status: 500 }
)
}
}
// --- End Gmail specific logic ---
// --- Outlook webhook setup ---
if (savedWebhook && provider === 'outlook') {
logger.info(
`[${requestId}] Outlook provider detected. Setting up Outlook webhook configuration.`
)
try {
const success = await configureOutlookPolling(savedWebhook, requestId)
if (!success) {
logger.error(`[${requestId}] Failed to configure Outlook polling, rolling back webhook`)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: 'Failed to configure Outlook polling',
details: 'Please check your Outlook account permissions and try again',
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Successfully configured Outlook polling`)
} catch (err) {
logger.error(
`[${requestId}] Error setting up Outlook webhook configuration, rolling back webhook`,
err
)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: 'Failed to configure Outlook webhook',
details: err instanceof Error ? err.message : 'Unknown error',
},
{ status: 500 }
)
}
}
// --- End Outlook specific logic ---
// --- RSS webhook setup ---
if (savedWebhook && provider === 'rss') {
logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`)
try {
const success = await configureRssPolling(savedWebhook, requestId)
if (!success) {
logger.error(`[${requestId}] Failed to configure RSS polling, rolling back webhook`)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: 'Failed to configure RSS polling',
details: 'Please try again',
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Successfully configured RSS polling`)
} catch (err) {
logger.error(
`[${requestId}] Error setting up RSS webhook configuration, rolling back webhook`,
err
)
await revertSavedWebhook(savedWebhook, existingWebhook, requestId)
return NextResponse.json(
{
error: 'Failed to configure RSS webhook',
details: err instanceof Error ? err.message : 'Unknown error',
},
{ status: 500 }
)
}
}
// --- End RSS specific logic ---
if (!targetWebhookId && savedWebhook) {
try {

View File

@@ -97,6 +97,7 @@ const {
handleSlackChallengeMock,
processWhatsAppDeduplicationMock,
processGenericDeduplicationMock,
fetchAndProcessAirtablePayloadsMock,
processWebhookMock,
executeMock,
getWorkspaceBilledAccountUserIdMock,
@@ -108,6 +109,7 @@ const {
handleSlackChallengeMock: vi.fn().mockReturnValue(null),
processWhatsAppDeduplicationMock: vi.fn().mockResolvedValue(null),
processGenericDeduplicationMock: vi.fn().mockResolvedValue(null),
fetchAndProcessAirtablePayloadsMock: vi.fn().mockResolvedValue(undefined),
processWebhookMock: vi.fn().mockResolvedValue(new Response('Webhook processed', { status: 200 })),
executeMock: vi.fn().mockResolvedValue({
success: true,
@@ -154,8 +156,10 @@ vi.mock('@/background/logs-webhook-delivery', () => ({
vi.mock('@/lib/webhooks/utils', () => ({
handleWhatsAppVerification: handleWhatsAppVerificationMock,
handleSlackChallenge: handleSlackChallengeMock,
verifyProviderWebhook: vi.fn().mockReturnValue(null),
processWhatsAppDeduplication: processWhatsAppDeduplicationMock,
processGenericDeduplication: processGenericDeduplicationMock,
fetchAndProcessAirtablePayloads: fetchAndProcessAirtablePayloadsMock,
processWebhook: processWebhookMock,
}))

View File

@@ -87,7 +87,7 @@ async function handleWebhookPost(
if (webhooksForPath.length === 0) {
const verificationResponse = await handlePreLookupWebhookVerification(
request.method,
body as Record<string, unknown> | undefined,
body,
requestId,
path
)

View File

@@ -7,11 +7,12 @@ import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
import { generateId } from '@/lib/core/utils/uuid'
import { processExecutionFiles } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
import { getProviderHandler } from '@/lib/webhooks/providers'
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils.server'
import {
executeWorkflowCore,
wasExecutionFinalizedByCore,
@@ -22,7 +23,6 @@ import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
import { getBlock } from '@/blocks'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { safeAssign } from '@/tools/safe-assign'
import { getTrigger, isTriggerValid } from '@/triggers'
@@ -48,12 +48,12 @@ export function buildWebhookCorrelation(
}
/**
* Process trigger outputs based on their schema definitions.
* Finds outputs marked as 'file' or 'file[]' and uploads them to execution storage.
* Process trigger outputs based on their schema definitions
* Finds outputs marked as 'file' or 'file[]' and uploads them to execution storage
*/
async function processTriggerFileOutputs(
input: unknown,
triggerOutputs: Record<string, unknown>,
input: any,
triggerOutputs: Record<string, any>,
context: {
workspaceId: string
workflowId: string
@@ -62,31 +62,29 @@ async function processTriggerFileOutputs(
userId?: string
},
path = ''
): Promise<unknown> {
): Promise<any> {
if (!input || typeof input !== 'object') {
return input
}
const processed = (Array.isArray(input) ? [] : {}) as Record<string, unknown>
const processed: any = Array.isArray(input) ? [] : {}
for (const [key, value] of Object.entries(input)) {
const currentPath = path ? `${path}.${key}` : key
const outputDef = triggerOutputs[key] as Record<string, unknown> | undefined
const val = value as Record<string, unknown>
const outputDef = triggerOutputs[key]
const val: any = value
// If this field is marked as file or file[], process it
if (outputDef?.type === 'file[]' && Array.isArray(val)) {
try {
processed[key] = await WebhookAttachmentProcessor.processAttachments(
val as unknown as Parameters<typeof WebhookAttachmentProcessor.processAttachments>[0],
context
)
processed[key] = await WebhookAttachmentProcessor.processAttachments(val as any, context)
} catch (error) {
processed[key] = []
}
} else if (outputDef?.type === 'file' && val) {
try {
const [processedFile] = await WebhookAttachmentProcessor.processAttachments(
[val] as unknown as Parameters<typeof WebhookAttachmentProcessor.processAttachments>[0],
[val as any],
context
)
processed[key] = processedFile
@@ -100,20 +98,18 @@ async function processTriggerFileOutputs(
(outputDef.type === 'object' || outputDef.type === 'json') &&
outputDef.properties
) {
// Explicit object schema with properties - recurse into properties
processed[key] = await processTriggerFileOutputs(
val,
outputDef.properties as Record<string, unknown>,
outputDef.properties,
context,
currentPath
)
} else if (outputDef && typeof outputDef === 'object' && !outputDef.type) {
processed[key] = await processTriggerFileOutputs(
val,
outputDef as Record<string, unknown>,
context,
currentPath
)
// Nested object in schema (flat pattern) - recurse with the nested schema
processed[key] = await processTriggerFileOutputs(val, outputDef, context, currentPath)
} else {
// Not a file output - keep as is
processed[key] = val
}
}
@@ -129,7 +125,7 @@ export type WebhookExecutionPayload = {
requestId?: string
correlation?: AsyncExecutionCorrelation
provider: string
body: unknown
body: any
headers: Record<string, string>
path: string
blockId?: string
@@ -168,6 +164,9 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
)
}
/**
* Resolve the account userId for a credential
*/
async function resolveCredentialAccountUserId(credentialId: string): Promise<string | undefined> {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
@@ -181,62 +180,6 @@ async function resolveCredentialAccountUserId(credentialId: string): Promise<str
return credentialRecord?.userId
}
/**
* Handle execution result status (timeout, pause, resume).
* Shared between all provider paths to eliminate duplication.
*/
async function handleExecutionResult(
executionResult: ExecutionResult,
ctx: {
loggingSession: LoggingSession
timeoutController: ReturnType<typeof createTimeoutAbortController>
requestId: string
executionId: string
workflowId: string
}
) {
if (
executionResult.status === 'cancelled' &&
ctx.timeoutController.isTimedOut() &&
ctx.timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, ctx.timeoutController.timeoutMs)
logger.info(`[${ctx.requestId}] Webhook execution timed out`, {
timeoutMs: ctx.timeoutController.timeoutMs,
})
await ctx.loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${ctx.requestId}] Missing snapshot seed for paused execution`, {
executionId: ctx.executionId,
})
await ctx.loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId: ctx.workflowId,
executionId: ctx.executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${ctx.requestId}] Failed to persist pause result`, {
executionId: ctx.executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await ctx.loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(ctx.executionId)
}
await ctx.loggingSession.waitForPostExecution()
}
async function executeWebhookJobInternal(
payload: WebhookExecutionPayload,
correlation: AsyncExecutionCorrelation
@@ -249,6 +192,7 @@ async function executeWebhookJobInternal(
requestId
)
// Resolve workflow record, billing actor, subscription, and timeout
const preprocessResult = await preprocessExecution({
workflowId: payload.workflowId,
userId: payload.userId,
@@ -277,13 +221,14 @@ async function executeWebhookJobInternal(
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const workflowVariables = (workflowRecord.variables as Record<string, unknown>) || {}
const workflowVariables = (workflowRecord.variables as Record<string, any>) || {}
const asyncTimeout = executionTimeout?.async ?? 120_000
const timeoutController = createTimeoutAbortController(asyncTimeout)
let deploymentVersionId: string | undefined
try {
// Parallelize workflow state, webhook record, and credential resolution
const [workflowData, webhookRows, resolvedCredentialUserId] = await Promise.all([
loadDeployedWorkflowState(payload.workflowId, workspaceId),
db.select().from(webhook).where(eq(webhook.id, payload.webhookId)).limit(1),
@@ -310,38 +255,134 @@ async function executeWebhookJobInternal(
? (workflowData.deploymentVersionId as string)
: undefined
const handler = getProviderHandler(payload.provider)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
let input: Record<string, unknown> | null = null
let skipMessage: string | undefined
const webhookRecord = webhookRows[0]
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
if (handler.formatInput) {
const result = await handler.formatInput({
webhook: webhookRecord,
workflow: { id: payload.workflowId, userId: payload.userId },
body: payload.body,
headers: payload.headers,
requestId,
})
input = result.input as Record<string, unknown> | null
skipMessage = result.skip?.message
} else {
input = payload.body as Record<string, unknown> | null
}
if (!input && handler.handleEmptyInput) {
const skipResult = handler.handleEmptyInput(requestId)
if (skipResult) {
skipMessage = skipResult.message
const webhookRecord = webhookRows[0]
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
}
if (skipMessage) {
const webhookData = {
id: payload.webhookId,
provider: payload.provider,
providerConfig: webhookRecord.providerConfig,
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId,
userId: payload.userId,
sessionUserId: undefined,
workflowUserId: workflowRecord.userId,
triggerType: payload.provider || 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
isClientSession: false,
credentialAccountUserId,
correlation,
workflowStateOverride: {
blocks,
edges,
loops: loops || {},
parallels: parallels || {},
deploymentVersionId,
},
}
const snapshot = new ExecutionSnapshot(
metadata,
workflowRecord,
airtableInput,
workflowVariables,
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: timeoutController.signal,
})
if (
executionResult.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
await loggingSession.waitForPostExecution()
logger.info(`[${requestId}] Airtable webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
}
// No changes to process
logger.info(`[${requestId}] No Airtable changes to process`)
await loggingSession.safeStart({
userId: payload.userId,
workspaceId,
@@ -356,7 +397,7 @@ async function executeWebhookJobInternal(
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: skipMessage },
finalOutput: { message: 'No Airtable changes to process' },
traceSpans: [],
})
@@ -364,11 +405,61 @@ async function executeWebhookJobInternal(
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: skipMessage },
output: { message: 'No Airtable changes to process' },
executedAt: new Date().toISOString(),
}
}
// Format input for standard webhooks
const actualWebhook =
webhookRows.length > 0
? webhookRows[0]
: {
provider: payload.provider,
blockId: payload.blockId,
providerConfig: {},
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const mockRequest = {
headers: new Map(Object.entries(payload.headers)),
} as any
const input = await formatWebhookInput(actualWebhook, mockWorkflow, payload.body, mockRequest)
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
await loggingSession.safeStart({
userId: payload.userId,
workspaceId,
variables: {},
triggerData: {
isTest: false,
correlation,
},
deploymentVersionId,
})
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No messages in WhatsApp payload' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No messages in WhatsApp payload' },
executedAt: new Date().toISOString(),
}
}
// Process trigger file outputs based on schema
if (input && payload.blockId && blocks[payload.blockId]) {
try {
const triggerBlock = blocks[payload.blockId]
@@ -411,20 +502,49 @@ async function executeWebhookJobInternal(
}
}
if (input && handler.processInputFiles && payload.blockId && blocks[payload.blockId]) {
// Process generic webhook files based on inputFormat
if (input && payload.provider === 'generic' && payload.blockId && blocks[payload.blockId]) {
try {
await handler.processInputFiles({
input,
blocks,
blockId: payload.blockId,
workspaceId,
workflowId: payload.workflowId,
executionId,
requestId,
userId: payload.userId,
})
const triggerBlock = blocks[payload.blockId]
if (triggerBlock?.subBlocks?.inputFormat?.value) {
const inputFormat = triggerBlock.subBlocks.inputFormat.value as unknown as Array<{
name: string
type: 'string' | 'number' | 'boolean' | 'object' | 'array' | 'file[]'
}>
const fileFields = inputFormat.filter((field) => field.type === 'file[]')
if (fileFields.length > 0 && typeof input === 'object' && input !== null) {
const executionContext = {
workspaceId,
workflowId: payload.workflowId,
executionId,
}
for (const fileField of fileFields) {
const fieldValue = input[fileField.name]
if (fieldValue && typeof fieldValue === 'object') {
const uploadedFiles = await processExecutionFiles(
fieldValue,
executionContext,
requestId,
payload.userId
)
if (uploadedFiles.length > 0) {
input[fileField.name] = uploadedFiles
logger.info(
`[${requestId}] Successfully processed ${uploadedFiles.length} file(s) for field: ${fileField.name}`
)
}
}
}
}
}
} catch (error) {
logger.error(`[${requestId}] Error processing provider-specific files:`, error)
logger.error(`[${requestId}] Error processing generic webhook files:`, error)
}
}
@@ -469,17 +589,49 @@ async function executeWebhookJobInternal(
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: timeoutController.signal,
})
await handleExecutionResult(executionResult, {
loggingSession,
timeoutController,
requestId,
executionId,
workflowId: payload.workflowId,
})
if (
executionResult.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Webhook execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
await loggingSession.waitForPostExecution()
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,

View File

@@ -0,0 +1,267 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
const { mockIsHosted, mockIsAzureConfigured, mockIsOllamaConfigured } = vi.hoisted(() => ({
mockIsHosted: { value: false },
mockIsAzureConfigured: { value: false },
mockIsOllamaConfigured: { value: false },
}))
const {
mockGetHostedModels,
mockGetProviderModels,
mockGetProviderIcon,
mockGetBaseModelProviders,
} = vi.hoisted(() => ({
mockGetHostedModels: vi.fn(() => []),
mockGetProviderModels: vi.fn(() => []),
mockGetProviderIcon: vi.fn(() => null),
mockGetBaseModelProviders: vi.fn(() => ({})),
}))
const { mockProviders } = vi.hoisted(() => ({
mockProviders: {
value: {
base: { models: [] as string[], isLoading: false },
ollama: { models: [] as string[], isLoading: false },
vllm: { models: [] as string[], isLoading: false },
openrouter: { models: [] as string[], isLoading: false },
fireworks: { models: [] as string[], isLoading: false },
},
},
}))
vi.mock('@/lib/core/config/feature-flags', () => ({
get isHosted() {
return mockIsHosted.value
},
get isAzureConfigured() {
return mockIsAzureConfigured.value
},
get isOllamaConfigured() {
return mockIsOllamaConfigured.value
},
}))
vi.mock('@/providers/models', () => ({
getHostedModels: mockGetHostedModels,
getProviderModels: mockGetProviderModels,
getProviderIcon: mockGetProviderIcon,
getBaseModelProviders: mockGetBaseModelProviders,
}))
vi.mock('@/stores/providers/store', () => ({
useProvidersStore: {
getState: () => ({
get providers() {
return mockProviders.value
},
}),
},
}))
vi.mock('@/lib/oauth/utils', () => ({
getScopesForService: vi.fn(() => []),
}))
import { getApiKeyCondition } from '@/blocks/utils'
const BASE_CLOUD_MODELS: Record<string, string> = {
'gpt-4o': 'openai',
'claude-sonnet-4-5': 'anthropic',
'gemini-2.5-pro': 'google',
'mistral-large-latest': 'mistral',
}
describe('getApiKeyCondition / shouldRequireApiKeyForModel', () => {
const evaluateCondition = (model: string): boolean => {
const conditionFn = getApiKeyCondition()
const condition = conditionFn({ model })
if ('not' in condition && condition.not) return false
if (condition.value === '__no_model_selected__') return false
return true
}
beforeEach(() => {
vi.clearAllMocks()
mockIsHosted.value = false
mockIsAzureConfigured.value = false
mockIsOllamaConfigured.value = false
mockProviders.value = {
base: { models: [], isLoading: false },
ollama: { models: [], isLoading: false },
vllm: { models: [], isLoading: false },
openrouter: { models: [], isLoading: false },
fireworks: { models: [], isLoading: false },
}
mockGetHostedModels.mockReturnValue([])
mockGetProviderModels.mockReturnValue([])
mockGetBaseModelProviders.mockReturnValue({})
})
describe('empty or missing model', () => {
it('does not require API key when model is empty', () => {
expect(evaluateCondition('')).toBe(false)
})
it('does not require API key when model is whitespace', () => {
expect(evaluateCondition(' ')).toBe(false)
})
})
describe('hosted models', () => {
it('does not require API key for hosted models on hosted platform', () => {
mockIsHosted.value = true
mockGetHostedModels.mockReturnValue(['gpt-4o', 'claude-sonnet-4-5'])
expect(evaluateCondition('gpt-4o')).toBe(false)
expect(evaluateCondition('claude-sonnet-4-5')).toBe(false)
})
it('requires API key for non-hosted models on hosted platform', () => {
mockIsHosted.value = true
mockGetHostedModels.mockReturnValue(['gpt-4o'])
expect(evaluateCondition('claude-sonnet-4-5')).toBe(true)
})
})
describe('Vertex AI models', () => {
it('does not require API key for vertex/ prefixed models', () => {
expect(evaluateCondition('vertex/gemini-2.5-pro')).toBe(false)
})
})
describe('Bedrock models', () => {
it('does not require API key for bedrock/ prefixed models', () => {
expect(evaluateCondition('bedrock/anthropic.claude-v2')).toBe(false)
})
})
describe('Azure models', () => {
it('does not require API key for azure/ models when Azure is configured', () => {
mockIsAzureConfigured.value = true
expect(evaluateCondition('azure/gpt-4o')).toBe(false)
expect(evaluateCondition('azure-openai/gpt-4o')).toBe(false)
expect(evaluateCondition('azure-anthropic/claude-sonnet-4-5')).toBe(false)
})
it('requires API key for azure/ models when Azure is not configured', () => {
mockIsAzureConfigured.value = false
expect(evaluateCondition('azure/gpt-4o')).toBe(true)
})
})
describe('vLLM models', () => {
it('does not require API key for vllm/ prefixed models', () => {
expect(evaluateCondition('vllm/my-model')).toBe(false)
expect(evaluateCondition('vllm/llama-3-70b')).toBe(false)
})
})
describe('provider store lookup (client-side)', () => {
it('does not require API key when model is in the Ollama store bucket', () => {
mockProviders.value.ollama.models = ['llama3:latest', 'mistral:latest']
expect(evaluateCondition('llama3:latest')).toBe(false)
expect(evaluateCondition('mistral:latest')).toBe(false)
})
it('requires API key when model is in the base store bucket', () => {
mockProviders.value.base.models = ['gpt-4o', 'claude-sonnet-4-5']
expect(evaluateCondition('gpt-4o')).toBe(true)
expect(evaluateCondition('claude-sonnet-4-5')).toBe(true)
})
it('does not require API key when model is in the vLLM store bucket', () => {
mockProviders.value.vllm.models = ['my-custom-model']
expect(evaluateCondition('my-custom-model')).toBe(false)
})
it('requires API key when model is in the fireworks store bucket', () => {
mockProviders.value.fireworks.models = ['fireworks/llama-3']
expect(evaluateCondition('fireworks/llama-3')).toBe(true)
})
it('requires API key when model is in the openrouter store bucket', () => {
mockProviders.value.openrouter.models = ['openrouter/anthropic/claude']
expect(evaluateCondition('openrouter/anthropic/claude')).toBe(true)
})
it('is case-insensitive for store lookup', () => {
mockProviders.value.ollama.models = ['Llama3:Latest']
expect(evaluateCondition('llama3:latest')).toBe(false)
})
})
describe('Ollama — OLLAMA_URL env var (server-safe)', () => {
it('does not require API key for unknown models when OLLAMA_URL is set', () => {
mockIsOllamaConfigured.value = true
expect(evaluateCondition('llama3:latest')).toBe(false)
expect(evaluateCondition('phi3:latest')).toBe(false)
expect(evaluateCondition('gemma2:latest')).toBe(false)
expect(evaluateCondition('deepseek-coder:latest')).toBe(false)
})
it('does not require API key for Ollama models that match cloud provider regex patterns', () => {
mockIsOllamaConfigured.value = true
expect(evaluateCondition('mistral:latest')).toBe(false)
expect(evaluateCondition('mistral')).toBe(false)
expect(evaluateCondition('mistral-nemo')).toBe(false)
expect(evaluateCondition('gpt2')).toBe(false)
})
it('requires API key for known cloud models even when OLLAMA_URL is set', () => {
mockIsOllamaConfigured.value = true
mockGetBaseModelProviders.mockReturnValue(BASE_CLOUD_MODELS)
expect(evaluateCondition('gpt-4o')).toBe(true)
expect(evaluateCondition('claude-sonnet-4-5')).toBe(true)
expect(evaluateCondition('gemini-2.5-pro')).toBe(true)
expect(evaluateCondition('mistral-large-latest')).toBe(true)
})
it('requires API key for slash-prefixed cloud models when OLLAMA_URL is set', () => {
mockIsOllamaConfigured.value = true
expect(evaluateCondition('azure/gpt-4o')).toBe(true)
expect(evaluateCondition('fireworks/llama-3')).toBe(true)
expect(evaluateCondition('openrouter/anthropic/claude')).toBe(true)
expect(evaluateCondition('groq/llama-3')).toBe(true)
})
})
describe('cloud provider models that need API key', () => {
it('requires API key for standard cloud models on hosted platform', () => {
mockIsHosted.value = true
mockGetHostedModels.mockReturnValue([])
expect(evaluateCondition('gpt-4o')).toBe(true)
expect(evaluateCondition('claude-sonnet-4-5')).toBe(true)
expect(evaluateCondition('gemini-2.5-pro')).toBe(true)
expect(evaluateCondition('mistral-large-latest')).toBe(true)
})
it('requires API key for prefixed cloud models on hosted platform', () => {
mockIsHosted.value = true
expect(evaluateCondition('fireworks/llama-3')).toBe(true)
expect(evaluateCondition('openrouter/anthropic/claude')).toBe(true)
expect(evaluateCondition('groq/llama-3')).toBe(true)
expect(evaluateCondition('cerebras/gpt-oss-120b')).toBe(true)
})
it('requires API key for prefixed cloud models on self-hosted', () => {
mockIsHosted.value = false
expect(evaluateCondition('fireworks/llama-3')).toBe(true)
expect(evaluateCondition('openrouter/anthropic/claude')).toBe(true)
expect(evaluateCondition('groq/llama-3')).toBe(true)
expect(evaluateCondition('cerebras/gpt-oss-120b')).toBe(true)
})
})
describe('self-hosted without OLLAMA_URL', () => {
it('requires API key for any model (Ollama models cannot appear without OLLAMA_URL)', () => {
mockIsHosted.value = false
mockIsOllamaConfigured.value = false
expect(evaluateCondition('llama3:latest')).toBe(true)
expect(evaluateCondition('mistral:latest')).toBe(true)
expect(evaluateCondition('gpt-4o')).toBe(true)
})
})
})

View File

@@ -1,9 +1,9 @@
import { isAzureConfigured, isHosted } from '@/lib/core/config/feature-flags'
import { isAzureConfigured, isHosted, isOllamaConfigured } from '@/lib/core/config/feature-flags'
import { getScopesForService } from '@/lib/oauth/utils'
import type { BlockOutput, OutputFieldDefinition, SubBlockConfig } from '@/blocks/types'
import {
getBaseModelProviders,
getHostedModels,
getProviderFromModel,
getProviderIcon,
getProviderModels,
} from '@/providers/models'
@@ -100,11 +100,15 @@ export function resolveOutputType(
return resolvedOutputs
}
/**
* Helper to get current Ollama models from store
*/
const getCurrentOllamaModels = () => {
return useProvidersStore.getState().providers.ollama.models
function getProviderFromStore(model: string): string | null {
const { providers } = useProvidersStore.getState()
const normalized = model.toLowerCase()
for (const [key, state] of Object.entries(providers)) {
if (state.models.some((m: string) => m.toLowerCase() === normalized)) {
return key
}
}
return null
}
function buildModelVisibilityCondition(model: string, shouldShow: boolean) {
@@ -119,16 +123,14 @@ function shouldRequireApiKeyForModel(model: string): boolean {
const normalizedModel = model.trim().toLowerCase()
if (!normalizedModel) return false
const hostedModels = getHostedModels()
const isHostedModel = hostedModels.some(
(hostedModel) => hostedModel.toLowerCase() === normalizedModel
)
if (isHosted && isHostedModel) return false
if (isHosted) {
const hostedModels = getHostedModels()
if (hostedModels.some((m) => m.toLowerCase() === normalizedModel)) return false
}
if (normalizedModel.startsWith('vertex/') || normalizedModel.startsWith('bedrock/')) {
return false
}
if (
isAzureConfigured &&
(normalizedModel.startsWith('azure/') ||
@@ -138,30 +140,18 @@ function shouldRequireApiKeyForModel(model: string): boolean {
) {
return false
}
if (normalizedModel.startsWith('vllm/')) {
return false
}
const currentOllamaModels = getCurrentOllamaModels()
if (currentOllamaModels.some((ollamaModel) => ollamaModel.toLowerCase() === normalizedModel)) {
return false
}
const storeProvider = getProviderFromStore(normalizedModel)
if (storeProvider === 'ollama' || storeProvider === 'vllm') return false
if (storeProvider) return true
if (!isHosted) {
try {
const providerId = getProviderFromModel(model)
if (
providerId === 'ollama' ||
providerId === 'vllm' ||
providerId === 'vertex' ||
providerId === 'bedrock'
) {
return false
}
} catch {
// If model resolution fails, fall through and require an API key.
}
if (isOllamaConfigured) {
if (normalizedModel.includes('/')) return true
if (normalizedModel in getBaseModelProviders()) return true
return false
}
return true

View File

@@ -122,6 +122,14 @@ export const isInboxEnabled = isTruthy(env.INBOX_ENABLED)
*/
export const isE2bEnabled = isTruthy(env.E2B_ENABLED)
/**
* Whether Ollama is configured (OLLAMA_URL is set).
* When true, models that are not in the static cloud model list and have no
* slash-prefixed provider namespace are assumed to be Ollama models
* and do not require an API key.
*/
export const isOllamaConfigured = Boolean(env.OLLAMA_URL)
/**
* Whether Azure OpenAI / Azure Anthropic credentials are pre-configured at the server level
* (via AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_ANTHROPIC_ENDPOINT, etc.).

View File

@@ -6,7 +6,7 @@ import { getRedisClient } from '@/lib/core/config/redis'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
import { generateId } from '@/lib/core/utils/uuid'
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/providers'
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
const logger = createLogger('IdempotencyService')

View File

@@ -11,8 +11,11 @@ import {
createExternalWebhookSubscription,
shouldRecreateExternalWebhookSubscription,
} from '@/lib/webhooks/provider-subscriptions'
import { getProviderHandler } from '@/lib/webhooks/providers'
import { syncWebhooksForCredentialSet } from '@/lib/webhooks/utils.server'
import {
configureGmailPolling,
configureOutlookPolling,
syncWebhooksForCredentialSet,
} from '@/lib/webhooks/utils.server'
import { getBlock } from '@/blocks'
import type { SubBlockConfig } from '@/blocks/types'
import type { BlockState } from '@/stores/workflows/workflow/types'
@@ -230,20 +233,29 @@ function buildProviderConfig(
async function configurePollingIfNeeded(
provider: string,
savedWebhook: Record<string, unknown>,
savedWebhook: any,
requestId: string
): Promise<TriggerSaveError | null> {
const handler = getProviderHandler(provider)
if (!handler.configurePolling) {
return null
if (provider === 'gmail') {
const success = await configureGmailPolling(savedWebhook, requestId)
if (!success) {
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
return {
message: 'Failed to configure Gmail polling. Please check your Gmail account permissions.',
status: 500,
}
}
}
const success = await handler.configurePolling({ webhook: savedWebhook, requestId })
if (!success) {
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id as string))
return {
message: `Failed to configure ${provider} polling. Please check your account permissions.`,
status: 500,
if (provider === 'outlook') {
const success = await configureOutlookPolling(savedWebhook, requestId)
if (!success) {
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
return {
message:
'Failed to configure Outlook polling. Please check your Outlook account permissions.',
status: 500,
}
}
}
@@ -285,7 +297,7 @@ async function syncCredentialSetWebhooks(params: {
basePath: triggerPath,
credentialSetId,
oauthProviderId,
providerConfig: baseConfig as Record<string, unknown>,
providerConfig: baseConfig as Record<string, any>,
requestId,
deploymentVersionId,
})
@@ -310,13 +322,13 @@ async function syncCredentialSetWebhooks(params: {
}
}
const handler = getProviderHandler(provider)
if (handler.configurePolling) {
if (provider === 'gmail' || provider === 'outlook') {
const configureFunc = provider === 'gmail' ? configureGmailPolling : configureOutlookPolling
for (const wh of syncResult.webhooks) {
if (wh.isNew) {
const rows = await db.select().from(webhook).where(eq(webhook.id, wh.id)).limit(1)
if (rows.length > 0) {
const success = await handler.configurePolling({ webhook: rows[0], requestId })
const success = await configureFunc(rows[0], requestId)
if (!success) {
await db.delete(webhook).where(eq(webhook.id, wh.id))
return {
@@ -447,18 +459,6 @@ export async function saveTriggerWebhooksForDeploy({
}
}
if (providerConfig.requireAuth && !providerConfig.token) {
await restorePreviousSubscriptions()
return {
success: false,
error: {
message:
'Authentication is enabled but no token is configured. Please set an authentication token or disable authentication.',
status: 400,
},
}
}
webhookConfigs.set(block.id, { provider, providerConfig, triggerPath, triggerDef })
if (providerConfig.credentialSetId) {
@@ -558,13 +558,13 @@ export async function saveTriggerWebhooksForDeploy({
await restorePreviousSubscriptions()
return { success: false, error: syncResult.error, warnings: collectedWarnings }
}
} catch (error: unknown) {
} catch (error: any) {
logger.error(`[${requestId}] Failed to create webhook for ${block.id}`, error)
await restorePreviousSubscriptions()
return {
success: false,
error: {
message: (error as Error)?.message || 'Failed to save trigger configuration',
message: error?.message || 'Failed to save trigger configuration',
status: 500,
},
warnings: collectedWarnings,
@@ -621,7 +621,7 @@ export async function saveTriggerWebhooksForDeploy({
updatedProviderConfig: result.updatedProviderConfig as Record<string, unknown>,
externalSubscriptionCreated: result.externalSubscriptionCreated,
})
} catch (error: unknown) {
} catch (error: any) {
logger.error(`[${requestId}] Failed to create external subscription for ${block.id}`, error)
await pendingVerificationTracker.clearAll()
for (const sub of createdSubscriptions) {
@@ -649,7 +649,7 @@ export async function saveTriggerWebhooksForDeploy({
return {
success: false,
error: {
message: (error as Error)?.message || 'Failed to create external subscription',
message: error?.message || 'Failed to create external subscription',
status: 500,
},
}
@@ -722,7 +722,7 @@ export async function saveTriggerWebhooksForDeploy({
return { success: false, error: pollingError }
}
}
} catch (error: unknown) {
} catch (error: any) {
await pendingVerificationTracker.clearAll()
logger.error(`[${requestId}] Failed to insert webhook records`, error)
for (const sub of createdSubscriptions) {
@@ -750,7 +750,7 @@ export async function saveTriggerWebhooksForDeploy({
return {
success: false,
error: {
message: (error as Error)?.message || 'Failed to save webhook records',
message: error?.message || 'Failed to save webhook records',
status: 500,
},
}

View File

@@ -106,10 +106,17 @@ vi.mock('@/lib/webhooks/utils', () => ({
vi.mock('@/lib/webhooks/utils.server', () => ({
handleSlackChallenge: vi.fn().mockReturnValue(null),
handleWhatsAppVerification: vi.fn().mockResolvedValue(null),
}))
vi.mock('@/lib/webhooks/providers', () => ({
getProviderHandler: vi.fn().mockReturnValue({}),
validateAttioSignature: vi.fn().mockReturnValue(true),
validateCalcomSignature: vi.fn().mockReturnValue(true),
validateCirclebackSignature: vi.fn().mockReturnValue(true),
validateFirefliesSignature: vi.fn().mockReturnValue(true),
validateGitHubSignature: vi.fn().mockReturnValue(true),
validateJiraSignature: vi.fn().mockReturnValue(true),
validateLinearSignature: vi.fn().mockReturnValue(true),
validateMicrosoftTeamsSignature: vi.fn().mockReturnValue(true),
validateTwilioSignature: vi.fn().mockResolvedValue(true),
validateTypeformSignature: vi.fn().mockReturnValue(true),
verifyProviderWebhook: vi.fn().mockReturnValue(null),
}))
vi.mock('@/background/webhook-execution', () => ({

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,93 @@
/**
* Provider-specific unique identifier extractors for webhook idempotency
*/
function extractSlackIdentifier(body: any): string | null {
if (body.event_id) {
return body.event_id
}
if (body.event?.ts && body.team_id) {
return `${body.team_id}:${body.event.ts}`
}
return null
}
function extractTwilioIdentifier(body: any): string | null {
return body.MessageSid || body.CallSid || null
}
function extractStripeIdentifier(body: any): string | null {
if (body.id && body.object === 'event') {
return body.id
}
return null
}
function extractHubSpotIdentifier(body: any): string | null {
if (Array.isArray(body) && body.length > 0 && body[0]?.eventId) {
return String(body[0].eventId)
}
return null
}
function extractLinearIdentifier(body: any): string | null {
if (body.action && body.data?.id) {
return `${body.action}:${body.data.id}`
}
return null
}
function extractJiraIdentifier(body: any): string | null {
if (body.webhookEvent && (body.issue?.id || body.project?.id)) {
return `${body.webhookEvent}:${body.issue?.id || body.project?.id}`
}
return null
}
function extractMicrosoftTeamsIdentifier(body: any): string | null {
if (body.value && Array.isArray(body.value) && body.value.length > 0) {
const notification = body.value[0]
if (notification.subscriptionId && notification.resourceData?.id) {
return `${notification.subscriptionId}:${notification.resourceData.id}`
}
}
return null
}
function extractAirtableIdentifier(body: any): string | null {
if (body.cursor && typeof body.cursor === 'string') {
return body.cursor
}
return null
}
function extractGrainIdentifier(body: any): string | null {
if (body.type && body.data?.id) {
return `${body.type}:${body.data.id}`
}
return null
}
const PROVIDER_EXTRACTORS: Record<string, (body: any) => string | null> = {
slack: extractSlackIdentifier,
twilio: extractTwilioIdentifier,
twilio_voice: extractTwilioIdentifier,
stripe: extractStripeIdentifier,
hubspot: extractHubSpotIdentifier,
linear: extractLinearIdentifier,
jira: extractJiraIdentifier,
'microsoft-teams': extractMicrosoftTeamsIdentifier,
airtable: extractAirtableIdentifier,
grain: extractGrainIdentifier,
}
export function extractProviderIdentifierFromBody(provider: string, body: any): string | null {
if (!body || typeof body !== 'object') {
return null
}
const extractor = PROVIDER_EXTRACTORS[provider]
return extractor ? extractor(body) : null
}

View File

@@ -1,760 +0,0 @@
import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { validateAirtableId } from '@/lib/core/security/input-validation'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
getCredentialOwner,
getNotificationUrl,
getProviderConfig,
} from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
FormatInputContext,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import {
getOAuthToken,
refreshAccessTokenIfNeeded,
resolveOAuthAccountId,
} from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProvider:Airtable')
interface AirtableChange {
tableId: string
recordId: string
changeType: 'created' | 'updated'
changedFields: Record<string, unknown>
previousFields?: Record<string, unknown>
}
interface AirtableTableChanges {
createdRecordsById?: Record<string, { cellValuesByFieldId?: Record<string, unknown> }>
changedRecordsById?: Record<
string,
{
current?: { cellValuesByFieldId?: Record<string, unknown> }
previous?: { cellValuesByFieldId?: Record<string, unknown> }
}
>
destroyedRecordIds?: string[]
}
/**
* Process Airtable payloads
*/
async function fetchAndProcessAirtablePayloads(
webhookData: Record<string, unknown>,
workflowData: Record<string, unknown>,
requestId: string // Original request ID from the ping, used for the final execution log
) {
// Logging handles all error logging
let currentCursor: number | null = null
let mightHaveMore = true
let payloadsFetched = 0
let apiCallCount = 0
// Use a Map to consolidate changes per record ID
const consolidatedChangesMap = new Map<string, AirtableChange>()
// Capture raw payloads from Airtable for exposure to workflows
const allPayloads = []
const localProviderConfig = {
...((webhookData.providerConfig as Record<string, unknown>) || {}),
} as Record<string, unknown>
try {
const baseId = localProviderConfig.baseId
const airtableWebhookId = localProviderConfig.externalId
if (!baseId || !airtableWebhookId) {
logger.error(
`[${requestId}] Missing baseId or externalId in providerConfig for webhook ${webhookData.id}. Cannot fetch payloads.`
)
return
}
const credentialId = localProviderConfig.credentialId as string | undefined
if (!credentialId) {
logger.error(
`[${requestId}] Missing credentialId in providerConfig for Airtable webhook ${webhookData.id}.`
)
return
}
const resolvedAirtable = await resolveOAuthAccountId(credentialId)
if (!resolvedAirtable) {
logger.error(
`[${requestId}] Could not resolve credential ${credentialId} for Airtable webhook`
)
return
}
let ownerUserId: string | null = null
try {
const rows = await db
.select()
.from(account)
.where(eq(account.id, resolvedAirtable.accountId))
.limit(1)
ownerUserId = rows.length ? rows[0].userId : null
} catch (_e) {
ownerUserId = null
}
if (!ownerUserId) {
logger.error(
`[${requestId}] Could not resolve owner for Airtable credential ${credentialId} on webhook ${webhookData.id}`
)
return
}
const storedCursor = localProviderConfig.externalWebhookCursor
if (storedCursor === undefined || storedCursor === null) {
logger.info(
`[${requestId}] No cursor found in providerConfig for webhook ${webhookData.id}, initializing...`
)
localProviderConfig.externalWebhookCursor = null
try {
await db
.update(webhook)
.set({
providerConfig: {
...localProviderConfig,
externalWebhookCursor: null,
},
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookData.id as string))
localProviderConfig.externalWebhookCursor = null
logger.info(`[${requestId}] Successfully initialized cursor for webhook ${webhookData.id}`)
} catch (initError: unknown) {
const err = initError as Error
logger.error(`[${requestId}] Failed to initialize cursor in DB`, {
webhookId: webhookData.id,
error: err.message,
stack: err.stack,
})
}
}
if (storedCursor && typeof storedCursor === 'number') {
currentCursor = storedCursor
} else {
currentCursor = null
}
let accessToken: string | null = null
try {
accessToken = await refreshAccessTokenIfNeeded(
resolvedAirtable.accountId,
ownerUserId,
requestId
)
if (!accessToken) {
logger.error(
`[${requestId}] Failed to obtain valid Airtable access token via credential ${credentialId}.`
)
throw new Error('Airtable access token not found.')
}
} catch (tokenError: unknown) {
const err = tokenError as Error
logger.error(
`[${requestId}] Failed to get Airtable OAuth token for credential ${credentialId}`,
{
error: err.message,
stack: err.stack,
credentialId,
}
)
return
}
const airtableApiBase = 'https://api.airtable.com/v0'
while (mightHaveMore) {
apiCallCount++
// Safety break
if (apiCallCount > 10) {
mightHaveMore = false
break
}
const apiUrl = `${airtableApiBase}/bases/${baseId}/webhooks/${airtableWebhookId}/payloads`
const queryParams = new URLSearchParams()
if (currentCursor !== null) {
queryParams.set('cursor', currentCursor.toString())
}
const fullUrl = `${apiUrl}?${queryParams.toString()}`
try {
const fetchStartTime = Date.now()
const response = await fetch(fullUrl, {
method: 'GET',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
})
const responseBody = await response.json()
if (!response.ok || responseBody.error) {
const errorMessage =
responseBody.error?.message ||
responseBody.error ||
`Airtable API error Status ${response.status}`
logger.error(
`[${requestId}] Airtable API request to /payloads failed (Call ${apiCallCount})`,
{
webhookId: webhookData.id,
status: response.status,
error: errorMessage,
}
)
// Error logging handled by logging session
mightHaveMore = false
break
}
const receivedPayloads = responseBody.payloads || []
if (receivedPayloads.length > 0) {
payloadsFetched += receivedPayloads.length
// Keep the raw payloads for later exposure to the workflow
for (const p of receivedPayloads) {
allPayloads.push(p)
}
let changeCount = 0
for (const payload of receivedPayloads) {
if (payload.changedTablesById) {
for (const [tableId, tableChangesUntyped] of Object.entries(
payload.changedTablesById
)) {
const tableChanges = tableChangesUntyped as AirtableTableChanges
if (tableChanges.createdRecordsById) {
const createdCount = Object.keys(tableChanges.createdRecordsById).length
changeCount += createdCount
for (const [recordId, recordData] of Object.entries(
tableChanges.createdRecordsById
)) {
const existingChange = consolidatedChangesMap.get(recordId)
if (existingChange) {
// Record was created and possibly updated within the same batch
existingChange.changedFields = {
...existingChange.changedFields,
...(recordData.cellValuesByFieldId || {}),
}
// Keep changeType as 'created' if it started as created
} else {
// New creation
consolidatedChangesMap.set(recordId, {
tableId: tableId,
recordId: recordId,
changeType: 'created',
changedFields: recordData.cellValuesByFieldId || {},
})
}
}
}
// Handle updated records
if (tableChanges.changedRecordsById) {
const updatedCount = Object.keys(tableChanges.changedRecordsById).length
changeCount += updatedCount
for (const [recordId, recordData] of Object.entries(
tableChanges.changedRecordsById
)) {
const existingChange = consolidatedChangesMap.get(recordId)
const currentFields = recordData.current?.cellValuesByFieldId || {}
if (existingChange) {
// Existing record was updated again
existingChange.changedFields = {
...existingChange.changedFields,
...currentFields,
}
// Ensure type is 'updated' if it was previously 'created'
existingChange.changeType = 'updated'
// Do not update previousFields again
} else {
// First update for this record in the batch
const newChange: AirtableChange = {
tableId: tableId,
recordId: recordId,
changeType: 'updated',
changedFields: currentFields,
}
if (recordData.previous?.cellValuesByFieldId) {
newChange.previousFields = recordData.previous.cellValuesByFieldId
}
consolidatedChangesMap.set(recordId, newChange)
}
}
}
// TODO: Handle deleted records (`destroyedRecordIds`) if needed
}
}
}
}
const nextCursor = responseBody.cursor
mightHaveMore = responseBody.mightHaveMore || false
if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) {
currentCursor = nextCursor
// Follow exactly the old implementation - use awaited update instead of parallel
const updatedConfig = {
...localProviderConfig,
externalWebhookCursor: currentCursor,
}
try {
// Force a complete object update to ensure consistency in serverless env
await db
.update(webhook)
.set({
providerConfig: updatedConfig, // Use full object
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookData.id as string))
localProviderConfig.externalWebhookCursor = currentCursor // Update local copy too
} catch (dbError: unknown) {
const err = dbError as Error
logger.error(`[${requestId}] Failed to persist Airtable cursor to DB`, {
webhookId: webhookData.id,
cursor: currentCursor,
error: err.message,
})
// Error logging handled by logging session
mightHaveMore = false
throw new Error('Failed to save Airtable cursor, stopping processing.') // Re-throw to break loop clearly
}
} else if (!nextCursor || typeof nextCursor !== 'number') {
logger.warn(`[${requestId}] Invalid or missing cursor received, stopping poll`, {
webhookId: webhookData.id,
apiCall: apiCallCount,
receivedCursor: nextCursor,
})
mightHaveMore = false
} else if (nextCursor === currentCursor) {
mightHaveMore = false // Explicitly stop if cursor hasn't changed
}
} catch (fetchError: unknown) {
logger.error(
`[${requestId}] Network error calling Airtable GET /payloads (Call ${apiCallCount}) for webhook ${webhookData.id}`,
fetchError
)
// Error logging handled by logging session
mightHaveMore = false
break
}
}
// Convert map values to array for final processing
const finalConsolidatedChanges = Array.from(consolidatedChangesMap.values())
logger.info(
`[${requestId}] Consolidated ${finalConsolidatedChanges.length} Airtable changes across ${apiCallCount} API calls`
)
if (finalConsolidatedChanges.length > 0 || allPayloads.length > 0) {
try {
// Build input exposing raw payloads and consolidated changes
const latestPayload = allPayloads.length > 0 ? allPayloads[allPayloads.length - 1] : null
const input: Record<string, unknown> = {
payloads: allPayloads,
latestPayload,
// Consolidated, simplified changes for convenience
airtableChanges: finalConsolidatedChanges,
// Include webhook metadata for resolver fallbacks
webhook: {
data: {
provider: 'airtable',
providerConfig: webhookData.providerConfig,
payload: latestPayload,
},
},
}
// CRITICAL EXECUTION TRACE POINT
logger.info(
`[${requestId}] CRITICAL_TRACE: Beginning workflow execution with ${finalConsolidatedChanges.length} Airtable changes`,
{
workflowId: workflowData.id,
recordCount: finalConsolidatedChanges.length,
timestamp: new Date().toISOString(),
firstRecordId: finalConsolidatedChanges[0]?.recordId || 'none',
}
)
// Return the processed input for the trigger.dev task to handle
logger.info(`[${requestId}] CRITICAL_TRACE: Airtable changes processed, returning input`, {
workflowId: workflowData.id,
recordCount: finalConsolidatedChanges.length,
rawPayloadCount: allPayloads.length,
timestamp: new Date().toISOString(),
})
return input
} catch (processingError: unknown) {
const err = processingError as Error
logger.error(`[${requestId}] CRITICAL_TRACE: Error processing Airtable changes`, {
workflowId: workflowData.id,
error: err.message,
stack: err.stack,
timestamp: new Date().toISOString(),
})
throw processingError
}
} else {
// DEBUG: Log when no changes are found
logger.info(`[${requestId}] TRACE: No Airtable changes to process`, {
workflowId: workflowData.id,
apiCallCount,
webhookId: webhookData.id,
})
}
} catch (error) {
// Catch any unexpected errors during the setup/polling logic itself
logger.error(
`[${requestId}] Unexpected error during asynchronous Airtable payload processing task`,
{
webhookId: webhookData.id,
workflowId: workflowData.id,
error: (error as Error).message,
}
)
// Error logging handled by logging session
}
}
export const airtableHandler: WebhookProviderHandler = {
async createSubscription({
webhook: webhookRecord,
workflow,
userId,
requestId,
}: SubscriptionContext): Promise<SubscriptionResult | undefined> {
try {
const { path, providerConfig } = webhookRecord as Record<string, unknown>
const config = (providerConfig as Record<string, unknown>) || {}
const { baseId, tableId, includeCellValuesInFieldIds, credentialId } = config as {
baseId?: string
tableId?: string
includeCellValuesInFieldIds?: string
credentialId?: string
}
if (!baseId || !tableId) {
logger.warn(`[${requestId}] Missing baseId or tableId for Airtable webhook creation.`, {
webhookId: webhookRecord.id,
})
throw new Error(
'Base ID and Table ID are required to create Airtable webhook. Please provide valid Airtable base and table IDs.'
)
}
const baseIdValidation = validateAirtableId(baseId, 'app', 'baseId')
if (!baseIdValidation.isValid) {
throw new Error(baseIdValidation.error)
}
const tableIdValidation = validateAirtableId(tableId, 'tbl', 'tableId')
if (!tableIdValidation.isValid) {
throw new Error(tableIdValidation.error)
}
const credentialOwner = credentialId
? await getCredentialOwner(credentialId, requestId)
: null
const accessToken = credentialId
? credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
: await getOAuthToken(userId, 'airtable')
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Airtable access token for user ${userId}. Cannot create webhook in Airtable.`
)
throw new Error(
'Airtable account connection required. Please connect your Airtable account in the trigger configuration and try again.'
)
}
const notificationUrl = `${getBaseUrl()}/api/webhooks/trigger/${path}`
const airtableApiUrl = `https://api.airtable.com/v0/bases/${baseId}/webhooks`
const specification: Record<string, unknown> = {
options: {
filters: {
dataTypes: ['tableData'],
recordChangeScope: tableId,
},
},
}
if (includeCellValuesInFieldIds === 'all') {
;(specification.options as Record<string, unknown>).includes = {
includeCellValuesInFieldIds: 'all',
}
}
const requestBody: Record<string, unknown> = {
notificationUrl: notificationUrl,
specification: specification,
}
const airtableResponse = await fetch(airtableApiUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = await airtableResponse.json()
if (!airtableResponse.ok || responseBody.error) {
const errorMessage =
responseBody.error?.message || responseBody.error || 'Unknown Airtable API error'
const errorType = responseBody.error?.type
logger.error(
`[${requestId}] Failed to create webhook in Airtable for webhook ${webhookRecord.id}. Status: ${airtableResponse.status}`,
{ type: errorType, message: errorMessage, response: responseBody }
)
let userFriendlyMessage = 'Failed to create webhook subscription in Airtable'
if (airtableResponse.status === 404) {
userFriendlyMessage =
'Airtable base or table not found. Please verify that the Base ID and Table ID are correct and that you have access to them.'
} else if (errorMessage && errorMessage !== 'Unknown Airtable API error') {
userFriendlyMessage = `Airtable error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
logger.info(
`[${requestId}] Successfully created webhook in Airtable for webhook ${webhookRecord.id}.`,
{
airtableWebhookId: responseBody.id,
}
)
return { providerConfigUpdates: { externalId: responseBody.id } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${requestId}] Exception during Airtable webhook creation for webhook ${webhookRecord.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription({
webhook: webhookRecord,
workflow,
requestId,
}: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(webhookRecord)
const { baseId, externalId } = config as {
baseId?: string
externalId?: string
}
if (!baseId) {
logger.warn(`[${requestId}] Missing baseId for Airtable webhook deletion`, {
webhookId: webhookRecord.id,
})
return
}
const baseIdValidation = validateAirtableId(baseId, 'app', 'baseId')
if (!baseIdValidation.isValid) {
logger.warn(`[${requestId}] Invalid Airtable base ID format, skipping deletion`, {
webhookId: webhookRecord.id,
baseId: baseId.substring(0, 20),
})
return
}
const credentialId = config.credentialId as string | undefined
if (!credentialId) {
logger.warn(
`[${requestId}] Missing credentialId for Airtable webhook deletion ${webhookRecord.id}`
)
return
}
const credentialOwner = await getCredentialOwner(credentialId, requestId)
const accessToken = credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Airtable access token. Cannot delete webhook in Airtable.`,
{ webhookId: webhookRecord.id }
)
return
}
let resolvedExternalId: string | undefined = externalId
if (!resolvedExternalId) {
try {
const expectedNotificationUrl = getNotificationUrl(webhookRecord)
const listUrl = `https://api.airtable.com/v0/bases/${baseId}/webhooks`
const listResp = await fetch(listUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
const listBody = await listResp.json().catch(() => null)
if (listResp.ok && listBody && Array.isArray(listBody.webhooks)) {
const match = listBody.webhooks.find((w: Record<string, unknown>) => {
const url: string | undefined = w?.notificationUrl as string | undefined
if (!url) return false
return (
url === expectedNotificationUrl ||
url.endsWith(`/api/webhooks/trigger/${webhookRecord.path}`)
)
})
if (match?.id) {
resolvedExternalId = match.id as string
logger.info(`[${requestId}] Resolved Airtable externalId by listing webhooks`, {
baseId,
externalId: resolvedExternalId,
})
} else {
logger.warn(`[${requestId}] Could not resolve Airtable externalId from list`, {
baseId,
expectedNotificationUrl,
})
}
} else {
logger.warn(`[${requestId}] Failed to list Airtable webhooks to resolve externalId`, {
baseId,
status: listResp.status,
body: listBody,
})
}
} catch (e: unknown) {
logger.warn(`[${requestId}] Error attempting to resolve Airtable externalId`, {
error: (e as Error)?.message,
})
}
}
if (!resolvedExternalId) {
logger.info(`[${requestId}] Airtable externalId not found; skipping remote deletion`, {
baseId,
})
return
}
const webhookIdValidation = validateAirtableId(resolvedExternalId, 'ach', 'webhookId')
if (!webhookIdValidation.isValid) {
logger.warn(`[${requestId}] Invalid Airtable webhook ID format, skipping deletion`, {
webhookId: webhookRecord.id,
externalId: resolvedExternalId.substring(0, 20),
})
return
}
const airtableDeleteUrl = `https://api.airtable.com/v0/bases/${baseId}/webhooks/${resolvedExternalId}`
const airtableResponse = await fetch(airtableDeleteUrl, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!airtableResponse.ok) {
let responseBody: unknown = null
try {
responseBody = await airtableResponse.json()
} catch {
// Ignore parse errors
}
logger.warn(
`[${requestId}] Failed to delete Airtable webhook in Airtable. Status: ${airtableResponse.status}`,
{ baseId, externalId: resolvedExternalId, response: responseBody }
)
} else {
logger.info(`[${requestId}] Successfully deleted Airtable webhook in Airtable`, {
baseId,
externalId: resolvedExternalId,
})
}
} catch (error: unknown) {
const err = error as Error
logger.error(`[${requestId}] Error deleting Airtable webhook`, {
webhookId: webhookRecord.id,
error: err.message,
stack: err.stack,
})
}
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
if (typeof obj.cursor === 'string') {
return obj.cursor
}
return null
},
async formatInput({ webhook, workflow, requestId }: FormatInputContext) {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
const webhookData = {
id: webhook.id,
provider: webhook.provider,
providerConfig: webhook.providerConfig,
}
const mockWorkflow = {
id: workflow.id,
userId: workflow.userId,
}
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
return { input: airtableInput }
}
logger.info(`[${requestId}] No Airtable changes to process`)
return { input: null, skip: { message: 'No Airtable changes to process' } }
},
}

View File

@@ -1,208 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import { generateId } from '@/lib/core/utils/uuid'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Ashby')
function validateAshbySignature(secretToken: string, signature: string, body: string): boolean {
try {
if (!secretToken || !signature || !body) {
return false
}
if (!signature.startsWith('sha256=')) {
return false
}
const providedSignature = signature.substring(7)
const computedHash = crypto.createHmac('sha256', secretToken).update(body, 'utf8').digest('hex')
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating Ashby signature:', error)
return false
}
}
export const ashbyHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
...((b.data as Record<string, unknown>) || {}),
action: b.action,
data: b.data || {},
},
}
},
verifyAuth: createHmacVerifier({
configKey: 'secretToken',
headerName: 'ashby-signature',
validateFn: validateAshbySignature,
providerLabel: 'Ashby',
}),
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
try {
const providerConfig = getProviderConfig(ctx.webhook)
const { apiKey, triggerId } = providerConfig as {
apiKey?: string
triggerId?: string
}
if (!apiKey) {
throw new Error(
'Ashby API Key is required. Please provide your API Key with apiKeysWrite permission in the trigger configuration.'
)
}
if (!triggerId) {
throw new Error('Trigger ID is required to create Ashby webhook.')
}
const webhookTypeMap: Record<string, string> = {
ashby_application_submit: 'applicationSubmit',
ashby_candidate_stage_change: 'candidateStageChange',
ashby_candidate_hire: 'candidateHire',
ashby_candidate_delete: 'candidateDelete',
ashby_job_create: 'jobCreate',
ashby_offer_create: 'offerCreate',
}
const webhookType = webhookTypeMap[triggerId]
if (!webhookType) {
throw new Error(`Unknown Ashby triggerId: ${triggerId}. Add it to webhookTypeMap.`)
}
const notificationUrl = getNotificationUrl(ctx.webhook)
const authString = Buffer.from(`${apiKey}:`).toString('base64')
logger.info(`[${ctx.requestId}] Creating Ashby webhook`, {
triggerId,
webhookType,
webhookId: ctx.webhook.id,
})
const secretToken = generateId()
const requestBody: Record<string, unknown> = {
requestUrl: notificationUrl,
webhookType,
secretToken,
}
const ashbyResponse = await fetch('https://api.ashbyhq.com/webhook.create', {
method: 'POST',
headers: {
Authorization: `Basic ${authString}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = (await ashbyResponse.json().catch(() => ({}))) as Record<string, unknown>
if (!ashbyResponse.ok || !responseBody.success) {
const errorInfo = responseBody.errorInfo as Record<string, string> | undefined
const errorMessage =
errorInfo?.message || (responseBody.message as string) || 'Unknown Ashby API error'
let userFriendlyMessage = 'Failed to create webhook subscription in Ashby'
if (ashbyResponse.status === 401) {
userFriendlyMessage =
'Invalid Ashby API Key. Please verify your API Key is correct and has apiKeysWrite permission.'
} else if (ashbyResponse.status === 403) {
userFriendlyMessage =
'Access denied. Please ensure your Ashby API Key has the apiKeysWrite permission.'
} else if (errorMessage && errorMessage !== 'Unknown Ashby API error') {
userFriendlyMessage = `Ashby error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
const results = responseBody.results as Record<string, unknown> | undefined
const externalId = results?.id as string | undefined
if (!externalId) {
throw new Error('Ashby webhook creation succeeded but no webhook ID was returned')
}
logger.info(
`[${ctx.requestId}] Successfully created Ashby webhook subscription ${externalId} for webhook ${ctx.webhook.id}`
)
return { providerConfigUpdates: { externalId, secretToken } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${ctx.requestId}] Exception during Ashby webhook creation for webhook ${ctx.webhook.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(ctx.webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
if (!apiKey) {
logger.warn(
`[${ctx.requestId}] Missing apiKey for Ashby webhook deletion ${ctx.webhook.id}, skipping cleanup`
)
return
}
if (!externalId) {
logger.warn(
`[${ctx.requestId}] Missing externalId for Ashby webhook deletion ${ctx.webhook.id}, skipping cleanup`
)
return
}
const authString = Buffer.from(`${apiKey}:`).toString('base64')
const ashbyResponse = await fetch('https://api.ashbyhq.com/webhook.delete', {
method: 'POST',
headers: {
Authorization: `Basic ${authString}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ webhookId: externalId }),
})
if (ashbyResponse.ok) {
await ashbyResponse.body?.cancel()
logger.info(
`[${ctx.requestId}] Successfully deleted Ashby webhook subscription ${externalId}`
)
} else if (ashbyResponse.status === 404) {
await ashbyResponse.body?.cancel()
logger.info(
`[${ctx.requestId}] Ashby webhook ${externalId} not found during deletion (already removed)`
)
} else {
const responseBody = await ashbyResponse.json().catch(() => ({}))
logger.warn(
`[${ctx.requestId}] Failed to delete Ashby webhook (non-fatal): ${ashbyResponse.status}`,
{ response: responseBody }
)
}
} catch (error) {
logger.warn(`[${ctx.requestId}] Error deleting Ashby webhook (non-fatal)`, error)
}
},
}

View File

@@ -1,366 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { safeCompare } from '@/lib/core/security/encryption'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { getCredentialOwner, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
AuthContext,
DeleteSubscriptionContext,
EventMatchContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProvider:Attio')
function validateAttioSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('Attio signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
logger.debug('Attio signature comparison', {
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${signature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: signature.length,
match: computedHash === signature,
})
return safeCompare(computedHash, signature)
} catch (error) {
logger.error('Error validating Attio signature:', error)
return false
}
}
export const attioHandler: WebhookProviderHandler = {
verifyAuth({ webhook, request, rawBody, requestId, providerConfig }: AuthContext) {
const secret = providerConfig.webhookSecret as string | undefined
if (!secret) {
logger.debug(
`[${requestId}] Attio webhook ${webhook.id as string} has no signing secret, skipping signature verification`
)
} else {
const signature = request.headers.get('Attio-Signature')
if (!signature) {
logger.warn(`[${requestId}] Attio webhook missing signature header`)
return new NextResponse('Unauthorized - Missing Attio signature', {
status: 401,
})
}
const isValidSignature = validateAttioSignature(secret, signature, rawBody)
if (!isValidSignature) {
logger.warn(`[${requestId}] Attio signature verification failed`, {
signatureLength: signature.length,
secretLength: secret.length,
})
return new NextResponse('Unauthorized - Invalid Attio signature', {
status: 401,
})
}
}
return null
},
async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined
const obj = body as Record<string, unknown>
if (triggerId && triggerId !== 'attio_webhook') {
const { isAttioPayloadMatch, getAttioEvent } = await import('@/triggers/attio/utils')
if (!isAttioPayloadMatch(triggerId, obj)) {
const event = getAttioEvent(obj)
const eventType = event?.event_type as string | undefined
logger.debug(
`[${requestId}] Attio event mismatch for trigger ${triggerId}. Event: ${eventType}. Skipping execution.`,
{
webhookId: webhook.id,
workflowId: workflow.id,
triggerId,
receivedEvent: eventType,
bodyKeys: Object.keys(obj),
}
)
return NextResponse.json({
status: 'skipped',
reason: 'event_type_mismatch',
})
}
}
return true
},
async createSubscription({
webhook: webhookRecord,
workflow,
userId,
requestId,
}: SubscriptionContext): Promise<SubscriptionResult | undefined> {
try {
const { path, providerConfig } = webhookRecord as Record<string, unknown>
const config = (providerConfig as Record<string, unknown>) || {}
const { triggerId, credentialId } = config as {
triggerId?: string
credentialId?: string
}
if (!credentialId) {
logger.warn(`[${requestId}] Missing credentialId for Attio webhook creation.`, {
webhookId: webhookRecord.id,
})
throw new Error(
'Attio account connection required. Please connect your Attio account in the trigger configuration and try again.'
)
}
const credentialOwner = await getCredentialOwner(credentialId, requestId)
const accessToken = credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Attio access token for user ${userId}. Cannot create webhook.`
)
throw new Error(
'Attio account connection required. Please connect your Attio account in the trigger configuration and try again.'
)
}
const notificationUrl = `${getBaseUrl()}/api/webhooks/trigger/${path}`
const { TRIGGER_EVENT_MAP } = await import('@/triggers/attio/utils')
let subscriptions: Array<{ event_type: string; filter: null }> = []
if (triggerId === 'attio_webhook') {
const allEvents = new Set<string>()
for (const events of Object.values(TRIGGER_EVENT_MAP)) {
for (const event of events) {
allEvents.add(event)
}
}
subscriptions = Array.from(allEvents).map((event_type) => ({ event_type, filter: null }))
} else {
const events = TRIGGER_EVENT_MAP[triggerId as string]
if (!events || events.length === 0) {
logger.warn(`[${requestId}] No event types mapped for trigger ${triggerId}`, {
webhookId: webhookRecord.id,
})
throw new Error(`Unknown Attio trigger type: ${triggerId}`)
}
subscriptions = events.map((event_type) => ({ event_type, filter: null }))
}
const requestBody = {
data: {
target_url: notificationUrl,
subscriptions,
},
}
const attioResponse = await fetch('https://api.attio.com/v2/webhooks', {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
if (!attioResponse.ok) {
const errorBody = await attioResponse.json().catch(() => ({}))
logger.error(
`[${requestId}] Failed to create webhook in Attio for webhook ${webhookRecord.id}. Status: ${attioResponse.status}`,
{ response: errorBody }
)
let userFriendlyMessage = 'Failed to create webhook subscription in Attio'
if (attioResponse.status === 401) {
userFriendlyMessage = 'Attio authentication failed. Please reconnect your Attio account.'
} else if (attioResponse.status === 403) {
userFriendlyMessage =
'Attio access denied. Please ensure your integration has webhook permissions.'
}
throw new Error(userFriendlyMessage)
}
const responseBody = await attioResponse.json()
const data = responseBody.data || responseBody
const webhookId = data.id?.webhook_id || data.webhook_id || data.id
const secret = data.secret
if (!webhookId) {
logger.error(
`[${requestId}] Attio webhook created but no webhook_id returned for webhook ${webhookRecord.id}`,
{ response: responseBody }
)
throw new Error('Attio webhook creation succeeded but no webhook ID was returned')
}
if (!secret) {
logger.warn(
`[${requestId}] Attio webhook created but no secret returned for webhook ${webhookRecord.id}. Signature verification will be skipped.`,
{ response: responseBody }
)
}
logger.info(
`[${requestId}] Successfully created webhook in Attio for webhook ${webhookRecord.id}.`,
{
attioWebhookId: webhookId,
targetUrl: notificationUrl,
subscriptionCount: subscriptions.length,
status: data.status,
}
)
return { providerConfigUpdates: { externalId: webhookId, webhookSecret: secret || '' } }
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error)
logger.error(
`[${requestId}] Exception during Attio webhook creation for webhook ${webhookRecord.id}.`,
{ message }
)
throw error
}
},
async deleteSubscription({
webhook: webhookRecord,
workflow,
requestId,
}: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(webhookRecord)
const externalId = config.externalId as string | undefined
const credentialId = config.credentialId as string | undefined
if (!externalId) {
logger.warn(
`[${requestId}] Missing externalId for Attio webhook deletion ${webhookRecord.id}, skipping cleanup`
)
return
}
if (!credentialId) {
logger.warn(
`[${requestId}] Missing credentialId for Attio webhook deletion ${webhookRecord.id}, skipping cleanup`
)
return
}
const credentialOwner = await getCredentialOwner(credentialId, requestId)
const accessToken = credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Attio access token. Cannot delete webhook.`,
{ webhookId: webhookRecord.id }
)
return
}
const attioResponse = await fetch(`https://api.attio.com/v2/webhooks/${externalId}`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!attioResponse.ok && attioResponse.status !== 404) {
const responseBody = await attioResponse.json().catch(() => ({}))
logger.warn(
`[${requestId}] Failed to delete Attio webhook (non-fatal): ${attioResponse.status}`,
{ response: responseBody }
)
} else {
logger.info(`[${requestId}] Successfully deleted Attio webhook ${externalId}`)
}
} catch (error) {
logger.warn(`[${requestId}] Error deleting Attio webhook (non-fatal)`, error)
}
},
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const {
extractAttioRecordData,
extractAttioRecordUpdatedData,
extractAttioRecordMergedData,
extractAttioNoteData,
extractAttioTaskData,
extractAttioCommentData,
extractAttioListEntryData,
extractAttioListEntryUpdatedData,
extractAttioListData,
extractAttioWorkspaceMemberData,
extractAttioGenericData,
} = await import('@/triggers/attio/utils')
const b = body as Record<string, unknown>
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const triggerId = providerConfig.triggerId as string | undefined
if (triggerId === 'attio_record_updated') {
return { input: extractAttioRecordUpdatedData(b) }
}
if (triggerId === 'attio_record_merged') {
return { input: extractAttioRecordMergedData(b) }
}
if (triggerId === 'attio_record_created' || triggerId === 'attio_record_deleted') {
return { input: extractAttioRecordData(b) }
}
if (triggerId?.startsWith('attio_note_')) {
return { input: extractAttioNoteData(b) }
}
if (triggerId?.startsWith('attio_task_')) {
return { input: extractAttioTaskData(b) }
}
if (triggerId?.startsWith('attio_comment_')) {
return { input: extractAttioCommentData(b) }
}
if (triggerId === 'attio_list_entry_updated') {
return { input: extractAttioListEntryUpdatedData(b) }
}
if (triggerId === 'attio_list_entry_created' || triggerId === 'attio_list_entry_deleted') {
return { input: extractAttioListEntryData(b) }
}
if (
triggerId === 'attio_list_created' ||
triggerId === 'attio_list_updated' ||
triggerId === 'attio_list_deleted'
) {
return { input: extractAttioListData(b) }
}
if (triggerId === 'attio_workspace_member_created') {
return { input: extractAttioWorkspaceMemberData(b) }
}
return { input: extractAttioGenericData(b) }
},
}

View File

@@ -1,47 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type { WebhookProviderHandler } from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Calcom')
function validateCalcomSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('Cal.com signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
let providedSignature: string
if (signature.startsWith('sha256=')) {
providedSignature = signature.substring(7)
} else {
providedSignature = signature
}
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
logger.debug('Cal.com signature comparison', {
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${providedSignature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: providedSignature.length,
match: computedHash === providedSignature,
})
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating Cal.com signature:', error)
return false
}
}
export const calcomHandler: WebhookProviderHandler = {
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'X-Cal-Signature-256',
validateFn: validateCalcomSignature,
providerLabel: 'Cal.com',
}),
}

View File

@@ -1,211 +0,0 @@
import { createLogger } from '@sim/logger'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Calendly')
export const calendlyHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
event: b.event,
created_at: b.created_at,
created_by: b.created_by,
payload: b.payload,
},
}
},
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
try {
const providerConfig = getProviderConfig(ctx.webhook)
const { apiKey, organization, triggerId } = providerConfig as {
apiKey?: string
organization?: string
triggerId?: string
}
if (!apiKey) {
logger.warn(`[${ctx.requestId}] Missing apiKey for Calendly webhook creation.`, {
webhookId: ctx.webhook.id,
})
throw new Error(
'Personal Access Token is required to create Calendly webhook. Please provide your Calendly Personal Access Token.'
)
}
if (!organization) {
logger.warn(`[${ctx.requestId}] Missing organization URI for Calendly webhook creation.`, {
webhookId: ctx.webhook.id,
})
throw new Error(
'Organization URI is required to create Calendly webhook. Please provide your Organization URI from the "Get Current User" operation.'
)
}
if (!triggerId) {
logger.warn(`[${ctx.requestId}] Missing triggerId for Calendly webhook creation.`, {
webhookId: ctx.webhook.id,
})
throw new Error('Trigger ID is required to create Calendly webhook')
}
const notificationUrl = getNotificationUrl(ctx.webhook)
const eventTypeMap: Record<string, string[]> = {
calendly_invitee_created: ['invitee.created'],
calendly_invitee_canceled: ['invitee.canceled'],
calendly_routing_form_submitted: ['routing_form_submission.created'],
calendly_webhook: [
'invitee.created',
'invitee.canceled',
'routing_form_submission.created',
],
}
const events = eventTypeMap[triggerId] || ['invitee.created']
const calendlyApiUrl = 'https://api.calendly.com/webhook_subscriptions'
const requestBody = {
url: notificationUrl,
events,
organization,
scope: 'organization',
}
const calendlyResponse = await fetch(calendlyApiUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
if (!calendlyResponse.ok) {
const errorBody = await calendlyResponse.json().catch(() => ({}))
const errorMessage =
(errorBody as Record<string, string>).message ||
(errorBody as Record<string, string>).title ||
'Unknown Calendly API error'
logger.error(
`[${ctx.requestId}] Failed to create webhook in Calendly for webhook ${ctx.webhook.id}. Status: ${calendlyResponse.status}`,
{ response: errorBody }
)
let userFriendlyMessage = 'Failed to create webhook subscription in Calendly'
if (calendlyResponse.status === 401) {
userFriendlyMessage =
'Calendly authentication failed. Please verify your Personal Access Token is correct.'
} else if (calendlyResponse.status === 403) {
userFriendlyMessage =
'Calendly access denied. Please ensure you have appropriate permissions and a paid Calendly subscription.'
} else if (calendlyResponse.status === 404) {
userFriendlyMessage =
'Calendly organization not found. Please verify the Organization URI is correct.'
} else if (errorMessage && errorMessage !== 'Unknown Calendly API error') {
userFriendlyMessage = `Calendly error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
const responseBody = (await calendlyResponse.json()) as Record<string, unknown>
const resource = responseBody.resource as Record<string, unknown> | undefined
const webhookUri = resource?.uri as string | undefined
if (!webhookUri) {
logger.error(
`[${ctx.requestId}] Calendly webhook created but no webhook URI returned for webhook ${ctx.webhook.id}`,
{ response: responseBody }
)
throw new Error('Calendly webhook creation succeeded but no webhook URI was returned')
}
const webhookId = webhookUri.split('/').pop()
if (!webhookId) {
logger.error(
`[${ctx.requestId}] Could not extract webhook ID from Calendly URI: ${webhookUri}`,
{
response: responseBody,
}
)
throw new Error('Failed to extract webhook ID from Calendly response')
}
logger.info(
`[${ctx.requestId}] Successfully created webhook in Calendly for webhook ${ctx.webhook.id}.`,
{
calendlyWebhookUri: webhookUri,
calendlyWebhookId: webhookId,
}
)
return { providerConfigUpdates: { externalId: webhookId } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${ctx.requestId}] Exception during Calendly webhook creation for webhook ${ctx.webhook.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(ctx.webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
if (!apiKey) {
logger.warn(
`[${ctx.requestId}] Missing apiKey for Calendly webhook deletion ${ctx.webhook.id}, skipping cleanup`
)
return
}
if (!externalId) {
logger.warn(
`[${ctx.requestId}] Missing externalId for Calendly webhook deletion ${ctx.webhook.id}, skipping cleanup`
)
return
}
const calendlyApiUrl = `https://api.calendly.com/webhook_subscriptions/${externalId}`
const calendlyResponse = await fetch(calendlyApiUrl, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${apiKey}`,
},
})
if (!calendlyResponse.ok && calendlyResponse.status !== 404) {
const responseBody = await calendlyResponse.json().catch(() => ({}))
logger.warn(
`[${ctx.requestId}] Failed to delete Calendly webhook (non-fatal): ${calendlyResponse.status}`,
{ response: responseBody }
)
} else {
logger.info(
`[${ctx.requestId}] Successfully deleted Calendly webhook subscription ${externalId}`
)
}
} catch (error) {
logger.warn(`[${ctx.requestId}] Error deleting Calendly webhook (non-fatal)`, error)
}
},
}

View File

@@ -1,67 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type {
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Circleback')
function validateCirclebackSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('Circleback signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
logger.debug('Circleback signature comparison', {
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${signature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: signature.length,
match: computedHash === signature,
})
return safeCompare(computedHash, signature)
} catch (error) {
logger.error('Error validating Circleback signature:', error)
return false
}
}
export const circlebackHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
id: b.id,
name: b.name,
createdAt: b.createdAt,
duration: b.duration,
url: b.url,
recordingUrl: b.recordingUrl,
tags: b.tags || [],
icalUid: b.icalUid,
attendees: b.attendees || [],
notes: b.notes || '',
actionItems: b.actionItems || [],
transcript: b.transcript || [],
insights: b.insights || {},
meeting: b,
},
}
},
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'x-signature',
validateFn: validateCirclebackSignature,
providerLabel: 'Circleback',
}),
}

View File

@@ -1,92 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { validateJiraSignature } from '@/lib/webhooks/providers/jira'
import type {
EventMatchContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Confluence')
export const confluenceHandler: WebhookProviderHandler = {
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'X-Hub-Signature',
validateFn: validateJiraSignature,
providerLabel: 'Confluence',
}),
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const {
extractPageData,
extractCommentData,
extractBlogData,
extractAttachmentData,
extractSpaceData,
extractLabelData,
} = await import('@/triggers/confluence/utils')
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const triggerId = providerConfig.triggerId as string | undefined
if (triggerId?.startsWith('confluence_comment_')) {
return { input: extractCommentData(body) }
}
if (triggerId?.startsWith('confluence_blog_')) {
return { input: extractBlogData(body) }
}
if (triggerId?.startsWith('confluence_attachment_')) {
return { input: extractAttachmentData(body) }
}
if (triggerId?.startsWith('confluence_space_')) {
return { input: extractSpaceData(body) }
}
if (triggerId?.startsWith('confluence_label_')) {
return { input: extractLabelData(body) }
}
if (triggerId === 'confluence_webhook') {
const b = body as Record<string, unknown>
return {
input: {
timestamp: b.timestamp,
userAccountId: b.userAccountId,
accountType: b.accountType,
page: b.page || null,
comment: b.comment || null,
blog: b.blog || (b as Record<string, unknown>).blogpost || null,
attachment: b.attachment || null,
space: b.space || null,
label: b.label || null,
content: b.content || null,
},
}
}
return { input: extractPageData(body) }
},
async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined
const obj = body as Record<string, unknown>
if (triggerId) {
const { isConfluencePayloadMatch } = await import('@/triggers/confluence/utils')
if (!isConfluencePayloadMatch(triggerId, obj)) {
logger.debug(
`[${requestId}] Confluence payload mismatch for trigger ${triggerId}. Skipping execution.`,
{
webhookId: webhook.id,
workflowId: workflow.id,
triggerId,
bodyKeys: Object.keys(obj),
}
)
return NextResponse.json({
message: 'Payload does not match trigger configuration. Ignoring.',
})
}
}
return true
},
}

View File

@@ -1,173 +0,0 @@
import { createLogger } from '@sim/logger'
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Fathom')
export const fathomHandler: WebhookProviderHandler = {
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
const { webhook, requestId } = ctx
try {
const providerConfig = getProviderConfig(webhook)
const apiKey = providerConfig.apiKey as string | undefined
const triggerId = providerConfig.triggerId as string | undefined
const triggeredFor = providerConfig.triggeredFor as string | undefined
const includeSummary = providerConfig.includeSummary as unknown
const includeTranscript = providerConfig.includeTranscript as unknown
const includeActionItems = providerConfig.includeActionItems as unknown
const includeCrmMatches = providerConfig.includeCrmMatches as unknown
if (!apiKey) {
logger.warn(`[${requestId}] Missing apiKey for Fathom webhook creation.`, {
webhookId: webhook.id,
})
throw new Error(
'Fathom API Key is required. Please provide your API key in the trigger configuration.'
)
}
const notificationUrl = getNotificationUrl(webhook)
const triggeredForValue = triggeredFor || 'my_recordings'
const toBool = (val: unknown, fallback: boolean): boolean => {
if (val === undefined) return fallback
return val === true || val === 'true'
}
const requestBody: Record<string, unknown> = {
destination_url: notificationUrl,
triggered_for: [triggeredForValue],
include_summary: toBool(includeSummary, true),
include_transcript: toBool(includeTranscript, false),
include_action_items: toBool(includeActionItems, false),
include_crm_matches: toBool(includeCrmMatches, false),
}
logger.info(`[${requestId}] Creating Fathom webhook`, {
triggerId,
triggeredFor: triggeredForValue,
webhookId: webhook.id,
})
const fathomResponse = await fetch('https://api.fathom.ai/external/v1/webhooks', {
method: 'POST',
headers: {
'X-Api-Key': apiKey,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = (await fathomResponse.json().catch(() => ({}))) as Record<
string,
unknown
>
if (!fathomResponse.ok) {
const errorMessage =
(responseBody.message as string) ||
(responseBody.error as string) ||
'Unknown Fathom API error'
logger.error(
`[${requestId}] Failed to create webhook in Fathom for webhook ${webhook.id}. Status: ${fathomResponse.status}`,
{ message: errorMessage, response: responseBody }
)
let userFriendlyMessage = 'Failed to create webhook subscription in Fathom'
if (fathomResponse.status === 401) {
userFriendlyMessage = 'Invalid Fathom API Key. Please verify your key is correct.'
} else if (fathomResponse.status === 400) {
userFriendlyMessage = `Fathom error: ${errorMessage}`
} else if (errorMessage && errorMessage !== 'Unknown Fathom API error') {
userFriendlyMessage = `Fathom error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
if (!responseBody.id) {
logger.error(
`[${requestId}] Fathom webhook creation returned success but no webhook ID for ${webhook.id}.`
)
throw new Error('Fathom webhook created but no ID returned. Please try again.')
}
logger.info(
`[${requestId}] Successfully created webhook in Fathom for webhook ${webhook.id}.`,
{
fathomWebhookId: responseBody.id,
}
)
return { providerConfigUpdates: { externalId: responseBody.id } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${requestId}] Exception during Fathom webhook creation for webhook ${webhook.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
const { webhook, requestId } = ctx
try {
const config = getProviderConfig(webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
if (!apiKey) {
logger.warn(
`[${requestId}] Missing apiKey for Fathom webhook deletion ${webhook.id}, skipping cleanup`
)
return
}
if (!externalId) {
logger.warn(
`[${requestId}] Missing externalId for Fathom webhook deletion ${webhook.id}, skipping cleanup`
)
return
}
const idValidation = validateAlphanumericId(externalId, 'Fathom webhook ID', 100)
if (!idValidation.isValid) {
logger.warn(
`[${requestId}] Invalid externalId format for Fathom webhook deletion ${webhook.id}, skipping cleanup`
)
return
}
const fathomApiUrl = `https://api.fathom.ai/external/v1/webhooks/${externalId}`
const fathomResponse = await fetch(fathomApiUrl, {
method: 'DELETE',
headers: {
'X-Api-Key': apiKey,
'Content-Type': 'application/json',
},
})
if (!fathomResponse.ok && fathomResponse.status !== 404) {
logger.warn(
`[${requestId}] Failed to delete Fathom webhook (non-fatal): ${fathomResponse.status}`
)
} else {
logger.info(`[${requestId}] Successfully deleted Fathom webhook ${externalId}`)
}
} catch (error) {
logger.warn(`[${requestId}] Error deleting Fathom webhook (non-fatal)`, error)
}
},
}

View File

@@ -1,63 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type {
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Fireflies')
function validateFirefliesSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('Fireflies signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
if (!signature.startsWith('sha256=')) {
logger.warn('Fireflies signature has invalid format (expected sha256=)', {
signaturePrefix: signature.substring(0, 10),
})
return false
}
const providedSignature = signature.substring(7)
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
logger.debug('Fireflies signature comparison', {
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${providedSignature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: providedSignature.length,
match: computedHash === providedSignature,
})
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating Fireflies signature:', error)
return false
}
}
export const firefliesHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
meetingId: (b.meetingId || '') as string,
eventType: (b.eventType || 'Transcription completed') as string,
clientReferenceId: (b.clientReferenceId || '') as string,
},
}
},
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'x-hub-signature',
validateFn: validateFirefliesSignature,
providerLabel: 'Fireflies',
}),
}

View File

@@ -1,145 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import type {
AuthContext,
EventFilterContext,
FormatInputContext,
FormatInputResult,
ProcessFilesContext,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { verifyTokenAuth } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Generic')
export const genericHandler: WebhookProviderHandler = {
verifyAuth({ request, requestId, providerConfig }: AuthContext) {
if (providerConfig.requireAuth) {
const configToken = providerConfig.token as string | undefined
if (!configToken) {
return new NextResponse('Unauthorized - Authentication required but no token configured', {
status: 401,
})
}
const secretHeaderName = providerConfig.secretHeaderName as string | undefined
if (!verifyTokenAuth(request, configToken, secretHeaderName)) {
return new NextResponse('Unauthorized - Invalid authentication token', { status: 401 })
}
}
const allowedIps = providerConfig.allowedIps
if (allowedIps && Array.isArray(allowedIps) && allowedIps.length > 0) {
const clientIp =
request.headers.get('x-forwarded-for')?.split(',')[0].trim() ||
request.headers.get('x-real-ip') ||
'unknown'
if (clientIp === 'unknown' || !allowedIps.includes(clientIp)) {
logger.warn(`[${requestId}] Forbidden webhook access attempt - IP not allowed: ${clientIp}`)
return new NextResponse('Forbidden - IP not allowed', {
status: 403,
})
}
}
return null
},
enrichHeaders({ body, providerConfig }: EventFilterContext, headers: Record<string, string>) {
const idempotencyField = providerConfig.idempotencyField as string | undefined
if (idempotencyField && body) {
const value = idempotencyField
.split('.')
.reduce(
(acc: unknown, key: string) =>
acc && typeof acc === 'object' ? (acc as Record<string, unknown>)[key] : undefined,
body
)
if (value !== undefined && value !== null && typeof value !== 'object') {
headers['x-sim-idempotency-key'] = String(value)
}
}
},
formatSuccessResponse(providerConfig: Record<string, unknown>) {
if (providerConfig.responseMode === 'custom') {
const rawCode = Number(providerConfig.responseStatusCode) || 200
const statusCode = rawCode >= 100 && rawCode <= 599 ? rawCode : 200
const responseBody = (providerConfig.responseBody as string | undefined)?.trim()
if (!responseBody) {
return new NextResponse(null, { status: statusCode })
}
try {
const parsed = JSON.parse(responseBody)
return NextResponse.json(parsed, { status: statusCode })
} catch {
return new NextResponse(responseBody, {
status: statusCode,
headers: { 'Content-Type': 'text/plain' },
})
}
}
return null
},
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
return { input: body }
},
async processInputFiles({
input,
blocks,
blockId,
workspaceId,
workflowId,
executionId,
requestId,
userId,
}: ProcessFilesContext) {
const triggerBlock = blocks[blockId] as Record<string, unknown> | undefined
const subBlocks = triggerBlock?.subBlocks as Record<string, unknown> | undefined
const inputFormatBlock = subBlocks?.inputFormat as Record<string, unknown> | undefined
if (inputFormatBlock?.value) {
const inputFormat = inputFormatBlock.value as Array<{
name: string
type: 'string' | 'number' | 'boolean' | 'object' | 'array' | 'file[]'
}>
const fileFields = inputFormat.filter((field) => field.type === 'file[]')
if (fileFields.length > 0) {
const { processExecutionFiles } = await import('@/lib/execution/files')
const executionContext = {
workspaceId,
workflowId,
executionId,
}
for (const fileField of fileFields) {
const fieldValue = input[fileField.name]
if (fieldValue && typeof fieldValue === 'object') {
const uploadedFiles = await processExecutionFiles(
fieldValue,
executionContext,
requestId,
userId
)
if (uploadedFiles.length > 0) {
input[fileField.name] = uploadedFiles
logger.info(
`[${requestId}] Successfully processed ${uploadedFiles.length} file(s) for field: ${fileField.name}`
)
}
}
}
}
}
},
}

View File

@@ -1,124 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { safeCompare } from '@/lib/core/security/encryption'
import type {
AuthContext,
EventMatchContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:GitHub')
function validateGitHubSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('GitHub signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
let algorithm: 'sha256' | 'sha1'
let providedSignature: string
if (signature.startsWith('sha256=')) {
algorithm = 'sha256'
providedSignature = signature.substring(7)
} else if (signature.startsWith('sha1=')) {
algorithm = 'sha1'
providedSignature = signature.substring(5)
} else {
logger.warn('GitHub signature has invalid format', {
signature: `${signature.substring(0, 10)}...`,
})
return false
}
const computedHash = crypto.createHmac(algorithm, secret).update(body, 'utf8').digest('hex')
logger.debug('GitHub signature comparison', {
algorithm,
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${providedSignature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: providedSignature.length,
match: computedHash === providedSignature,
})
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating GitHub signature:', error)
return false
}
}
export const githubHandler: WebhookProviderHandler = {
verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) {
const secret = providerConfig.webhookSecret as string | undefined
if (!secret) {
return null
}
const signature =
request.headers.get('X-Hub-Signature-256') || request.headers.get('X-Hub-Signature')
if (!signature) {
logger.warn(`[${requestId}] GitHub webhook missing signature header`)
return new NextResponse('Unauthorized - Missing GitHub signature', { status: 401 })
}
if (!validateGitHubSignature(secret, signature, rawBody)) {
logger.warn(`[${requestId}] GitHub signature verification failed`, {
signatureLength: signature.length,
secretLength: secret.length,
usingSha256: !!request.headers.get('X-Hub-Signature-256'),
})
return new NextResponse('Unauthorized - Invalid GitHub signature', { status: 401 })
}
return null
},
async formatInput({ body, headers }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const eventType = headers['x-github-event'] || 'unknown'
const ref = (b?.ref as string) || ''
const branch = ref.replace('refs/heads/', '')
return {
input: { ...b, event_type: eventType, action: (b?.action || '') as string, branch },
}
},
async matchEvent({
webhook,
workflow,
body,
request,
requestId,
providerConfig,
}: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined
const obj = body as Record<string, unknown>
if (triggerId && triggerId !== 'github_webhook') {
const eventType = request.headers.get('x-github-event')
const action = obj.action as string | undefined
const { isGitHubEventMatch } = await import('@/triggers/github/utils')
if (!isGitHubEventMatch(triggerId, eventType || '', action, obj)) {
logger.debug(
`[${requestId}] GitHub event mismatch for trigger ${triggerId}. Event: ${eventType}, Action: ${action}. Skipping execution.`,
{
webhookId: webhook.id,
workflowId: workflow.id,
triggerId,
receivedEvent: eventType,
receivedAction: action,
}
)
return false
}
}
return true
},
}

View File

@@ -1,117 +0,0 @@
import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type {
FormatInputContext,
FormatInputResult,
PollingConfigContext,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { refreshAccessTokenIfNeeded, resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProvider:Gmail')
export const gmailHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
if (b && typeof b === 'object' && 'email' in b) {
return { input: { email: b.email, timestamp: b.timestamp } }
}
return { input: b }
},
async configurePolling({ webhook: webhookData, requestId }: PollingConfigContext) {
logger.info(`[${requestId}] Setting up Gmail polling for webhook ${webhookData.id}`)
try {
const providerConfig = (webhookData.providerConfig as Record<string, unknown>) || {}
const credentialId = providerConfig.credentialId as string | undefined
if (!credentialId) {
logger.error(`[${requestId}] Missing credentialId for Gmail webhook ${webhookData.id}`)
return false
}
const resolvedGmail = await resolveOAuthAccountId(credentialId)
if (!resolvedGmail) {
logger.error(
`[${requestId}] Could not resolve credential ${credentialId} for Gmail webhook ${webhookData.id}`
)
return false
}
const rows = await db
.select()
.from(account)
.where(eq(account.id, resolvedGmail.accountId))
.limit(1)
if (rows.length === 0) {
logger.error(
`[${requestId}] Credential ${credentialId} not found for Gmail webhook ${webhookData.id}`
)
return false
}
const effectiveUserId = rows[0].userId
const accessToken = await refreshAccessTokenIfNeeded(
resolvedGmail.accountId,
effectiveUserId,
requestId
)
if (!accessToken) {
logger.error(
`[${requestId}] Failed to refresh/access Gmail token for credential ${credentialId}`
)
return false
}
const maxEmailsPerPoll =
typeof providerConfig.maxEmailsPerPoll === 'string'
? Number.parseInt(providerConfig.maxEmailsPerPoll, 10) || 25
: (providerConfig.maxEmailsPerPoll as number) || 25
const pollingInterval =
typeof providerConfig.pollingInterval === 'string'
? Number.parseInt(providerConfig.pollingInterval, 10) || 5
: (providerConfig.pollingInterval as number) || 5
const now = new Date()
await db
.update(webhook)
.set({
providerConfig: {
...providerConfig,
userId: effectiveUserId,
credentialId,
maxEmailsPerPoll,
pollingInterval,
markAsRead: providerConfig.markAsRead || false,
includeRawEmail: providerConfig.includeRawEmail || false,
labelIds: providerConfig.labelIds || ['INBOX'],
labelFilterBehavior: providerConfig.labelFilterBehavior || 'INCLUDE',
lastCheckedTimestamp:
(providerConfig.lastCheckedTimestamp as string) || now.toISOString(),
setupCompleted: true,
},
updatedAt: now,
})
.where(eq(webhook.id, webhookData.id as string))
logger.info(
`[${requestId}] Successfully configured Gmail polling for webhook ${webhookData.id}`
)
return true
} catch (error: unknown) {
const err = error as Error
logger.error(`[${requestId}] Failed to configure Gmail polling`, {
webhookId: webhookData.id,
error: err.message,
stack: err.stack,
})
return false
}
},
}

View File

@@ -1,60 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import type {
AuthContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { verifyTokenAuth } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:GoogleForms')
export const googleFormsHandler: WebhookProviderHandler = {
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const normalizeAnswers = (src: unknown): Record<string, unknown> => {
if (!src || typeof src !== 'object') return {}
const out: Record<string, unknown> = {}
for (const [k, v] of Object.entries(src as Record<string, unknown>)) {
if (Array.isArray(v)) {
out[k] = v.length === 1 ? v[0] : v
} else {
out[k] = v
}
}
return out
}
const responseId = (b?.responseId || b?.id || '') as string
const createTime = (b?.createTime || b?.timestamp || new Date().toISOString()) as string
const lastSubmittedTime = (b?.lastSubmittedTime || createTime) as string
const formId = (b?.formId || providerConfig.formId || '') as string
const includeRaw = providerConfig.includeRawPayload !== false
return {
input: {
responseId,
createTime,
lastSubmittedTime,
formId,
answers: normalizeAnswers(b?.answers),
...(includeRaw ? { raw: b?.raw ?? b } : {}),
},
}
},
verifyAuth({ request, requestId, providerConfig }: AuthContext) {
const expectedToken = providerConfig.token as string | undefined
if (!expectedToken) {
return null
}
const secretHeaderName = providerConfig.secretHeaderName as string | undefined
if (!verifyTokenAuth(request, expectedToken, secretHeaderName)) {
logger.warn(`[${requestId}] Google Forms webhook authentication failed`)
return new NextResponse('Unauthorized - Invalid secret', { status: 401 })
}
return null
},
}

View File

@@ -1,251 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
EventFilterContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { skipByEventTypes } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Grain')
export const grainHandler: WebhookProviderHandler = {
handleReachabilityTest(body: unknown, requestId: string) {
const obj = body as Record<string, unknown> | null
const isVerificationRequest = !obj || Object.keys(obj).length === 0 || !obj.type
if (isVerificationRequest) {
logger.info(
`[${requestId}] Grain reachability test detected - returning 200 for webhook verification`
)
return NextResponse.json({
status: 'ok',
message: 'Webhook endpoint verified',
})
}
return null
},
shouldSkipEvent(ctx: EventFilterContext) {
return skipByEventTypes(ctx, 'Grain', logger)
},
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return { input: { type: b.type, user_id: b.user_id, data: b.data || {} } }
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
const data = obj.data as Record<string, unknown> | undefined
if (obj.type && data?.id) {
return `${obj.type}:${data.id}`
}
return null
},
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
const { webhook, requestId } = ctx
try {
const providerConfig = getProviderConfig(webhook)
const apiKey = providerConfig.apiKey as string | undefined
const triggerId = providerConfig.triggerId as string | undefined
const viewId = providerConfig.viewId as string | undefined
if (!apiKey) {
logger.warn(`[${requestId}] Missing apiKey for Grain webhook creation.`, {
webhookId: webhook.id,
})
throw new Error(
'Grain API Key is required. Please provide your Grain Personal Access Token in the trigger configuration.'
)
}
if (!viewId) {
logger.warn(`[${requestId}] Missing viewId for Grain webhook creation.`, {
webhookId: webhook.id,
triggerId,
})
throw new Error(
'Grain view ID is required. Please provide the Grain view ID from GET /_/public-api/views in the trigger configuration.'
)
}
const actionMap: Record<string, Array<'added' | 'updated' | 'removed'>> = {
grain_item_added: ['added'],
grain_item_updated: ['updated'],
grain_recording_created: ['added'],
grain_recording_updated: ['updated'],
grain_highlight_created: ['added'],
grain_highlight_updated: ['updated'],
grain_story_created: ['added'],
}
const eventTypeMap: Record<string, string[]> = {
grain_webhook: [],
grain_item_added: [],
grain_item_updated: [],
grain_recording_created: ['recording_added'],
grain_recording_updated: ['recording_updated'],
grain_highlight_created: ['highlight_added'],
grain_highlight_updated: ['highlight_updated'],
grain_story_created: ['story_added'],
}
const actions = actionMap[triggerId ?? ''] ?? []
const eventTypes = eventTypeMap[triggerId ?? ''] ?? []
if (!triggerId || (!(triggerId in actionMap) && triggerId !== 'grain_webhook')) {
logger.warn(
`[${requestId}] Unknown triggerId for Grain: ${triggerId}, defaulting to all actions`,
{
webhookId: webhook.id,
}
)
}
logger.info(`[${requestId}] Creating Grain webhook`, {
triggerId,
viewId,
actions,
eventTypes,
webhookId: webhook.id,
})
const notificationUrl = getNotificationUrl(webhook)
const grainApiUrl = 'https://api.grain.com/_/public-api/hooks'
const requestBody: Record<string, unknown> = {
version: 2,
hook_url: notificationUrl,
view_id: viewId,
}
if (actions.length > 0) {
requestBody.actions = actions
}
const grainResponse = await fetch(grainApiUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = (await grainResponse.json()) as Record<string, unknown>
if (!grainResponse.ok || responseBody.error || responseBody.errors) {
const errors = responseBody.errors as Record<string, string> | undefined
const error = responseBody.error as Record<string, string> | string | undefined
const errorMessage =
errors?.detail ||
(typeof error === 'object' ? error?.message : undefined) ||
(typeof error === 'string' ? error : undefined) ||
(responseBody.message as string) ||
'Unknown Grain API error'
logger.error(
`[${requestId}] Failed to create webhook in Grain for webhook ${webhook.id}. Status: ${grainResponse.status}`,
{ message: errorMessage, response: responseBody }
)
let userFriendlyMessage = 'Failed to create webhook subscription in Grain'
if (grainResponse.status === 401) {
userFriendlyMessage =
'Invalid Grain API Key. Please verify your Personal Access Token is correct.'
} else if (grainResponse.status === 403) {
userFriendlyMessage =
'Access denied. Please ensure your Grain API Key has appropriate permissions.'
} else if (errorMessage && errorMessage !== 'Unknown Grain API error') {
userFriendlyMessage = `Grain error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
const grainWebhookId = responseBody.id as string | undefined
if (!grainWebhookId) {
logger.error(
`[${requestId}] Grain webhook creation response missing id for webhook ${webhook.id}.`,
{
response: responseBody,
}
)
throw new Error(
'Grain webhook created but no webhook ID was returned in the response. Cannot track subscription.'
)
}
logger.info(
`[${requestId}] Successfully created webhook in Grain for webhook ${webhook.id}.`,
{
grainWebhookId,
eventTypes,
}
)
return { providerConfigUpdates: { externalId: grainWebhookId, eventTypes } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${requestId}] Exception during Grain webhook creation for webhook ${webhook.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
const { webhook, requestId } = ctx
try {
const config = getProviderConfig(webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
if (!apiKey) {
logger.warn(
`[${requestId}] Missing apiKey for Grain webhook deletion ${webhook.id}, skipping cleanup`
)
return
}
if (!externalId) {
logger.warn(
`[${requestId}] Missing externalId for Grain webhook deletion ${webhook.id}, skipping cleanup`
)
return
}
const grainApiUrl = `https://api.grain.com/_/public-api/hooks/${externalId}`
const grainResponse = await fetch(grainApiUrl, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
})
if (!grainResponse.ok && grainResponse.status !== 404) {
const responseBody = await grainResponse.json().catch(() => ({}))
logger.warn(
`[${requestId}] Failed to delete Grain webhook (non-fatal): ${grainResponse.status}`,
{ response: responseBody }
)
} else {
logger.info(`[${requestId}] Successfully deleted Grain webhook ${externalId}`)
}
} catch (error) {
logger.warn(`[${requestId}] Error deleting Grain webhook (non-fatal)`, error)
}
},
}

View File

@@ -1,75 +0,0 @@
import { createLogger } from '@sim/logger'
import type {
EventMatchContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:HubSpot')
export const hubspotHandler: WebhookProviderHandler = {
async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined
if (triggerId?.startsWith('hubspot_')) {
const events = Array.isArray(body) ? body : [body]
const firstEvent = events[0] as Record<string, unknown> | undefined
const subscriptionType = firstEvent?.subscriptionType as string | undefined
const { isHubSpotContactEventMatch } = await import('@/triggers/hubspot/utils')
if (!isHubSpotContactEventMatch(triggerId, subscriptionType || '')) {
logger.debug(
`[${requestId}] HubSpot event mismatch for trigger ${triggerId}. Event: ${subscriptionType}. Skipping execution.`,
{
webhookId: webhook.id,
workflowId: workflow.id,
triggerId,
receivedEvent: subscriptionType,
}
)
return false
}
logger.info(
`[${requestId}] HubSpot event match confirmed for trigger ${triggerId}. Event: ${subscriptionType}`,
{
webhookId: webhook.id,
workflowId: workflow.id,
triggerId,
receivedEvent: subscriptionType,
}
)
}
return true
},
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const events = Array.isArray(b) ? b : [b]
const event = events[0] as Record<string, unknown> | undefined
if (!event) {
logger.warn('HubSpot webhook received with empty payload')
return { input: null }
}
logger.info('Formatting HubSpot webhook input', {
subscriptionType: event.subscriptionType,
objectId: event.objectId,
portalId: event.portalId,
})
return {
input: { payload: body, provider: 'hubspot', providerConfig: webhook.providerConfig },
}
},
extractIdempotencyId(body: unknown) {
if (Array.isArray(body) && body.length > 0) {
const first = body[0] as Record<string, unknown>
if (first?.eventId) {
return String(first.eventId)
}
}
return null
},
}

View File

@@ -1,84 +0,0 @@
import { db } from '@sim/db'
import { webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type {
FormatInputContext,
FormatInputResult,
PollingConfigContext,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Imap')
export const imapHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
if (b && typeof b === 'object' && 'email' in b) {
return {
input: {
messageId: b.messageId,
subject: b.subject,
from: b.from,
to: b.to,
cc: b.cc,
date: b.date,
bodyText: b.bodyText,
bodyHtml: b.bodyHtml,
mailbox: b.mailbox,
hasAttachments: b.hasAttachments,
attachments: b.attachments,
email: b.email,
timestamp: b.timestamp,
},
}
}
return { input: b }
},
async configurePolling({ webhook: webhookData, requestId }: PollingConfigContext) {
logger.info(`[${requestId}] Setting up IMAP polling for webhook ${webhookData.id}`)
try {
const providerConfig = (webhookData.providerConfig as Record<string, unknown>) || {}
const now = new Date()
if (!providerConfig.host || !providerConfig.username || !providerConfig.password) {
logger.error(
`[${requestId}] Missing required IMAP connection settings for webhook ${webhookData.id}`
)
return false
}
await db
.update(webhook)
.set({
providerConfig: {
...providerConfig,
port: providerConfig.port || '993',
secure: providerConfig.secure !== false,
mailbox: providerConfig.mailbox || 'INBOX',
searchCriteria: providerConfig.searchCriteria || 'UNSEEN',
markAsRead: providerConfig.markAsRead || false,
includeAttachments: providerConfig.includeAttachments !== false,
lastCheckedTimestamp: now.toISOString(),
setupCompleted: true,
},
updatedAt: now,
})
.where(eq(webhook.id, webhookData.id as string))
logger.info(
`[${requestId}] Successfully configured IMAP polling for webhook ${webhookData.id}`
)
return true
} catch (error: unknown) {
const err = error as Error
logger.error(`[${requestId}] Failed to configure IMAP polling`, {
webhookId: webhookData.id,
error: err.message,
})
return false
}
},
}

View File

@@ -1,24 +0,0 @@
export { getProviderHandler } from '@/lib/webhooks/providers/registry'
export type {
AuthContext,
EventFilterContext,
EventMatchContext,
FormatInputContext,
FormatInputResult,
ProcessFilesContext,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { getProviderHandler } from '@/lib/webhooks/providers/registry'
/**
* Extract a provider-specific unique identifier from the webhook body for idempotency.
*/
export function extractProviderIdentifierFromBody(provider: string, body: unknown): string | null {
if (!body || typeof body !== 'object') {
return null
}
const handler = getProviderHandler(provider)
return handler.extractIdempotencyId?.(body) ?? null
}

View File

@@ -1,104 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type {
EventMatchContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Jira')
export function validateJiraSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('Jira signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
if (!signature.startsWith('sha256=')) {
logger.warn('Jira signature has invalid format (expected sha256=)', {
signaturePrefix: signature.substring(0, 10),
})
return false
}
const providedSignature = signature.substring(7)
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
logger.debug('Jira signature comparison', {
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${providedSignature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: providedSignature.length,
match: computedHash === providedSignature,
})
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating Jira signature:', error)
return false
}
}
export const jiraHandler: WebhookProviderHandler = {
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'X-Hub-Signature',
validateFn: validateJiraSignature,
providerLabel: 'Jira',
}),
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const { extractIssueData, extractCommentData, extractWorklogData } = await import(
'@/triggers/jira/utils'
)
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const triggerId = providerConfig.triggerId as string | undefined
if (triggerId === 'jira_issue_commented') {
return { input: extractCommentData(body) }
}
if (triggerId === 'jira_worklog_created') {
return { input: extractWorklogData(body) }
}
return { input: extractIssueData(body) }
},
async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined
const obj = body as Record<string, unknown>
if (triggerId && triggerId !== 'jira_webhook') {
const webhookEvent = obj.webhookEvent as string | undefined
const issueEventTypeName = obj.issue_event_type_name as string | undefined
const { isJiraEventMatch } = await import('@/triggers/jira/utils')
if (!isJiraEventMatch(triggerId, webhookEvent || '', issueEventTypeName)) {
logger.debug(
`[${requestId}] Jira event mismatch for trigger ${triggerId}. Event: ${webhookEvent}. Skipping execution.`,
{
webhookId: webhook.id,
workflowId: workflow.id,
triggerId,
receivedEvent: webhookEvent,
}
)
return false
}
}
return true
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
const issue = obj.issue as Record<string, unknown> | undefined
const project = obj.project as Record<string, unknown> | undefined
if (obj.webhookEvent && (issue?.id || project?.id)) {
return `${obj.webhookEvent}:${issue?.id || project?.id}`
}
return null
},
}

View File

@@ -1,218 +0,0 @@
import { createLogger } from '@sim/logger'
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Lemlist')
export const lemlistHandler: WebhookProviderHandler = {
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
const { webhook, requestId } = ctx
try {
const providerConfig = getProviderConfig(webhook)
const apiKey = providerConfig.apiKey as string | undefined
const triggerId = providerConfig.triggerId as string | undefined
const campaignId = providerConfig.campaignId as string | undefined
if (!apiKey) {
logger.warn(`[${requestId}] Missing apiKey for Lemlist webhook creation.`, {
webhookId: webhook.id,
})
throw new Error(
'Lemlist API Key is required. Please provide your Lemlist API Key in the trigger configuration.'
)
}
const eventTypeMap: Record<string, string | undefined> = {
lemlist_email_replied: 'emailsReplied',
lemlist_linkedin_replied: 'linkedinReplied',
lemlist_interested: 'interested',
lemlist_not_interested: 'notInterested',
lemlist_email_opened: 'emailsOpened',
lemlist_email_clicked: 'emailsClicked',
lemlist_email_bounced: 'emailsBounced',
lemlist_email_sent: 'emailsSent',
lemlist_webhook: undefined,
}
const eventType = eventTypeMap[triggerId ?? '']
const notificationUrl = getNotificationUrl(webhook)
const authString = Buffer.from(`:${apiKey}`).toString('base64')
logger.info(`[${requestId}] Creating Lemlist webhook`, {
triggerId,
eventType,
hasCampaignId: !!campaignId,
webhookId: webhook.id,
})
const lemlistApiUrl = 'https://api.lemlist.com/api/hooks'
const requestBody: Record<string, unknown> = {
targetUrl: notificationUrl,
}
if (eventType) {
requestBody.type = eventType
}
if (campaignId) {
requestBody.campaignId = campaignId
}
const lemlistResponse = await fetch(lemlistApiUrl, {
method: 'POST',
headers: {
Authorization: `Basic ${authString}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = (await lemlistResponse.json()) as Record<string, unknown>
if (!lemlistResponse.ok || responseBody.error) {
const errorMessage =
(responseBody.message as string) ||
(responseBody.error as string) ||
'Unknown Lemlist API error'
logger.error(
`[${requestId}] Failed to create webhook in Lemlist for webhook ${webhook.id}. Status: ${lemlistResponse.status}`,
{ message: errorMessage, response: responseBody }
)
let userFriendlyMessage = 'Failed to create webhook subscription in Lemlist'
if (lemlistResponse.status === 401) {
userFriendlyMessage = 'Invalid Lemlist API Key. Please verify your API Key is correct.'
} else if (lemlistResponse.status === 403) {
userFriendlyMessage =
'Access denied. Please ensure your Lemlist API Key has appropriate permissions.'
} else if (errorMessage && errorMessage !== 'Unknown Lemlist API error') {
userFriendlyMessage = `Lemlist error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
logger.info(
`[${requestId}] Successfully created webhook in Lemlist for webhook ${webhook.id}.`,
{
lemlistWebhookId: responseBody._id,
}
)
return { providerConfigUpdates: { externalId: responseBody._id } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${requestId}] Exception during Lemlist webhook creation for webhook ${webhook.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
const { webhook, requestId } = ctx
try {
const config = getProviderConfig(webhook)
const apiKey = config.apiKey as string | undefined
const externalId = config.externalId as string | undefined
if (!apiKey) {
logger.warn(
`[${requestId}] Missing apiKey for Lemlist webhook deletion ${webhook.id}, skipping cleanup`
)
return
}
const authString = Buffer.from(`:${apiKey}`).toString('base64')
const deleteById = async (id: string) => {
const validation = validateAlphanumericId(id, 'Lemlist hook ID', 50)
if (!validation.isValid) {
logger.warn(`[${requestId}] Invalid Lemlist hook ID format, skipping deletion`, {
id: id.substring(0, 30),
})
return
}
const lemlistApiUrl = `https://api.lemlist.com/api/hooks/${id}`
const lemlistResponse = await fetch(lemlistApiUrl, {
method: 'DELETE',
headers: {
Authorization: `Basic ${authString}`,
},
})
if (!lemlistResponse.ok && lemlistResponse.status !== 404) {
const responseBody = await lemlistResponse.json().catch(() => ({}))
logger.warn(
`[${requestId}] Failed to delete Lemlist webhook (non-fatal): ${lemlistResponse.status}`,
{ response: responseBody }
)
} else {
logger.info(`[${requestId}] Successfully deleted Lemlist webhook ${id}`)
}
}
if (externalId) {
await deleteById(externalId)
return
}
const notificationUrl = getNotificationUrl(webhook)
const listResponse = await fetch('https://api.lemlist.com/api/hooks', {
method: 'GET',
headers: {
Authorization: `Basic ${authString}`,
},
})
if (!listResponse.ok) {
logger.warn(`[${requestId}] Failed to list Lemlist webhooks for cleanup ${webhook.id}`, {
status: listResponse.status,
})
return
}
const listBody = (await listResponse.json().catch(() => null)) as
| Record<string, unknown>
| Array<Record<string, unknown>>
| null
const hooks: Array<Record<string, unknown>> = Array.isArray(listBody)
? listBody
: ((listBody as Record<string, unknown>)?.hooks as Array<Record<string, unknown>>) ||
((listBody as Record<string, unknown>)?.data as Array<Record<string, unknown>>) ||
[]
const matches = hooks.filter((hook) => {
const targetUrl = hook?.targetUrl || hook?.target_url || hook?.url
return typeof targetUrl === 'string' && targetUrl === notificationUrl
})
if (matches.length === 0) {
logger.info(`[${requestId}] Lemlist webhook not found for cleanup ${webhook.id}`, {
notificationUrl,
})
return
}
for (const hook of matches) {
const hookId = (hook?._id || hook?.id) as string | undefined
if (typeof hookId === 'string' && hookId.length > 0) {
await deleteById(hookId)
}
}
} catch (error) {
logger.warn(`[${requestId}] Error deleting Lemlist webhook (non-fatal)`, error)
}
},
}

View File

@@ -1,71 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import type {
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Linear')
function validateLinearSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
logger.warn('Linear signature validation missing required fields', {
hasSecret: !!secret,
hasSignature: !!signature,
hasBody: !!body,
})
return false
}
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('hex')
logger.debug('Linear signature comparison', {
computedSignature: `${computedHash.substring(0, 10)}...`,
providedSignature: `${signature.substring(0, 10)}...`,
computedLength: computedHash.length,
providedLength: signature.length,
match: computedHash === signature,
})
return safeCompare(computedHash, signature)
} catch (error) {
logger.error('Error validating Linear signature:', error)
return false
}
}
export const linearHandler: WebhookProviderHandler = {
verifyAuth: createHmacVerifier({
configKey: 'webhookSecret',
headerName: 'Linear-Signature',
validateFn: validateLinearSignature,
providerLabel: 'Linear',
}),
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
action: b.action || '',
type: b.type || '',
webhookId: b.webhookId || '',
webhookTimestamp: b.webhookTimestamp || 0,
organizationId: b.organizationId || '',
createdAt: b.createdAt || '',
actor: b.actor || null,
data: b.data || null,
updatedFrom: b.updatedFrom || null,
},
}
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
const data = obj.data as Record<string, unknown> | undefined
if (obj.action && data?.id) {
return `${obj.action}:${data.id}`
}
return null
},
}

View File

@@ -1,787 +0,0 @@
import crypto from 'crypto'
import { db } from '@sim/db'
import { account } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { safeCompare } from '@/lib/core/security/encryption'
import {
type SecureFetchResponse,
secureFetchWithPinnedIP,
validateUrlWithDNS,
} from '@/lib/core/security/input-validation.server'
import { sanitizeUrlForLog } from '@/lib/core/utils/logging'
import {
getCredentialOwner,
getNotificationUrl,
getProviderConfig,
} from '@/lib/webhooks/providers/subscription-utils'
import type {
AuthContext,
DeleteSubscriptionContext,
EventFilterContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { refreshAccessTokenIfNeeded, resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProvider:MicrosoftTeams')
function validateMicrosoftTeamsSignature(
hmacSecret: string,
signature: string,
body: string
): boolean {
try {
if (!hmacSecret || !signature || !body) {
return false
}
if (!signature.startsWith('HMAC ')) {
return false
}
const providedSignature = signature.substring(5)
const secretBytes = Buffer.from(hmacSecret, 'base64')
const bodyBytes = Buffer.from(body, 'utf8')
const computedHash = crypto.createHmac('sha256', secretBytes).update(bodyBytes).digest('base64')
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating Microsoft Teams signature:', error)
return false
}
}
function parseFirstNotification(
body: unknown
): { subscriptionId: string; messageId: string } | null {
const obj = body as Record<string, unknown>
const value = obj.value as unknown[] | undefined
if (!Array.isArray(value) || value.length === 0) {
return null
}
const notification = value[0] as Record<string, unknown>
const subscriptionId = notification.subscriptionId as string | undefined
const resourceData = notification.resourceData as Record<string, unknown> | undefined
const messageId = resourceData?.id as string | undefined
if (subscriptionId && messageId) {
return { subscriptionId, messageId }
}
return null
}
async function fetchWithDNSPinning(
url: string,
accessToken: string,
requestId: string
): Promise<SecureFetchResponse | null> {
try {
const urlValidation = await validateUrlWithDNS(url, 'contentUrl')
if (!urlValidation.isValid) {
logger.warn(`[${requestId}] Invalid content URL: ${urlValidation.error}`, { url })
return null
}
const headers: Record<string, string> = {}
if (accessToken) {
headers.Authorization = `Bearer ${accessToken}`
}
const response = await secureFetchWithPinnedIP(url, urlValidation.resolvedIP!, { headers })
return response
} catch (error) {
logger.error(`[${requestId}] Error fetching URL with DNS pinning`, {
error: error instanceof Error ? error.message : String(error),
url: sanitizeUrlForLog(url),
})
return null
}
}
/**
* Format Microsoft Teams Graph change notification
*/
async function formatTeamsGraphNotification(
body: Record<string, unknown>,
foundWebhook: Record<string, unknown>,
foundWorkflow: { id: string; userId: string },
request: { headers: Map<string, string> }
): Promise<unknown> {
const notification = (body.value as unknown[])?.[0] as Record<string, unknown> | undefined
if (!notification) {
logger.warn('Received empty Teams notification body')
return null
}
const changeType = (notification.changeType as string) || 'created'
const resource = (notification.resource as string) || ''
const subscriptionId = (notification.subscriptionId as string) || ''
let chatId: string | null = null
let messageId: string | null = null
const fullMatch = resource.match(/chats\/([^/]+)\/messages\/([^/]+)/)
if (fullMatch) {
chatId = fullMatch[1]
messageId = fullMatch[2]
}
if (!chatId || !messageId) {
const quotedMatch = resource.match(/chats\('([^']+)'\)\/messages\('([^']+)'\)/)
if (quotedMatch) {
chatId = quotedMatch[1]
messageId = quotedMatch[2]
}
}
if (!chatId || !messageId) {
const collectionMatch = resource.match(/chats\/([^/]+)\/messages$/)
const rdId = ((body?.value as unknown[])?.[0] as Record<string, unknown>)?.resourceData as
| Record<string, unknown>
| undefined
const rdIdValue = rdId?.id as string | undefined
if (collectionMatch && rdIdValue) {
chatId = collectionMatch[1]
messageId = rdIdValue
}
}
if (
(!chatId || !messageId) &&
((body?.value as unknown[])?.[0] as Record<string, unknown>)?.resourceData
) {
const resourceData = ((body.value as unknown[])[0] as Record<string, unknown>)
.resourceData as Record<string, unknown>
const odataId = resourceData['@odata.id']
if (typeof odataId === 'string') {
const odataMatch = odataId.match(/chats\('([^']+)'\)\/messages\('([^']+)'\)/)
if (odataMatch) {
chatId = odataMatch[1]
messageId = odataMatch[2]
}
}
}
if (!chatId || !messageId) {
logger.warn('Could not resolve chatId/messageId from Teams notification', {
resource,
hasResourceDataId: Boolean(
((body?.value as unknown[])?.[0] as Record<string, unknown>)?.resourceData
),
valueLength: Array.isArray(body?.value) ? (body.value as unknown[]).length : 0,
keys: Object.keys(body || {}),
})
return {
from: null,
message: { raw: body },
activity: body,
conversation: null,
}
}
const resolvedChatId = chatId as string
const resolvedMessageId = messageId as string
const providerConfig = (foundWebhook?.providerConfig as Record<string, unknown>) || {}
const credentialId = providerConfig.credentialId
const includeAttachments = providerConfig.includeAttachments !== false
let message: Record<string, unknown> | null = null
const rawAttachments: Array<{ name: string; data: Buffer; contentType: string; size: number }> =
[]
let accessToken: string | null = null
if (!credentialId) {
logger.error('Missing credentialId for Teams chat subscription', {
chatId: resolvedChatId,
messageId: resolvedMessageId,
webhookId: foundWebhook?.id,
blockId: foundWebhook?.blockId,
providerConfig,
})
} else {
try {
const resolved = await resolveOAuthAccountId(credentialId as string)
if (!resolved) {
logger.error('Teams credential could not be resolved', { credentialId })
} else {
const rows = await db
.select()
.from(account)
.where(eq(account.id, resolved.accountId))
.limit(1)
if (rows.length === 0) {
logger.error('Teams credential not found', { credentialId, chatId: resolvedChatId })
} else {
const effectiveUserId = rows[0].userId
accessToken = await refreshAccessTokenIfNeeded(
resolved.accountId,
effectiveUserId,
'teams-graph-notification'
)
}
}
if (accessToken) {
const msgUrl = `https://graph.microsoft.com/v1.0/chats/${encodeURIComponent(resolvedChatId)}/messages/${encodeURIComponent(resolvedMessageId)}`
const res = await fetch(msgUrl, { headers: { Authorization: `Bearer ${accessToken}` } })
if (res.ok) {
message = (await res.json()) as Record<string, unknown>
if (includeAttachments && (message?.attachments as unknown[] | undefined)?.length) {
const attachments = Array.isArray(message?.attachments)
? (message.attachments as Record<string, unknown>[])
: []
for (const att of attachments) {
try {
const contentUrl =
typeof att?.contentUrl === 'string' ? (att.contentUrl as string) : undefined
const contentTypeHint =
typeof att?.contentType === 'string' ? (att.contentType as string) : undefined
let attachmentName = (att?.name as string) || 'teams-attachment'
if (!contentUrl) continue
let buffer: Buffer | null = null
let mimeType = 'application/octet-stream'
if (contentUrl.includes('sharepoint.com') || contentUrl.includes('onedrive')) {
try {
const directRes = await fetchWithDNSPinning(
contentUrl,
accessToken,
'teams-attachment'
)
if (directRes?.ok) {
const arrayBuffer = await directRes.arrayBuffer()
buffer = Buffer.from(arrayBuffer)
mimeType =
directRes.headers.get('content-type') ||
contentTypeHint ||
'application/octet-stream'
} else if (directRes) {
const encodedUrl = Buffer.from(contentUrl)
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '')
const graphUrl = `https://graph.microsoft.com/v1.0/shares/u!${encodedUrl}/driveItem/content`
const graphRes = await fetch(graphUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
redirect: 'follow',
})
if (graphRes.ok) {
const arrayBuffer = await graphRes.arrayBuffer()
buffer = Buffer.from(arrayBuffer)
mimeType =
graphRes.headers.get('content-type') ||
contentTypeHint ||
'application/octet-stream'
} else {
continue
}
}
} catch {
continue
}
} else if (
contentUrl.includes('1drv.ms') ||
contentUrl.includes('onedrive.live.com') ||
contentUrl.includes('onedrive.com') ||
contentUrl.includes('my.microsoftpersonalcontent.com')
) {
try {
let shareToken: string | null = null
if (contentUrl.includes('1drv.ms')) {
const urlParts = contentUrl.split('/').pop()
if (urlParts) shareToken = urlParts
} else if (contentUrl.includes('resid=')) {
const urlParams = new URL(contentUrl).searchParams
const resId = urlParams.get('resid')
if (resId) shareToken = resId
}
if (!shareToken) {
const base64Url = Buffer.from(contentUrl, 'utf-8')
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '')
shareToken = `u!${base64Url}`
} else if (!shareToken.startsWith('u!')) {
const base64Url = Buffer.from(shareToken, 'utf-8')
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '')
shareToken = `u!${base64Url}`
}
const metadataUrl = `https://graph.microsoft.com/v1.0/shares/${shareToken}/driveItem`
const metadataRes = await fetch(metadataUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
Accept: 'application/json',
},
})
if (!metadataRes.ok) {
const directUrl = `https://graph.microsoft.com/v1.0/shares/${shareToken}/driveItem/content`
const directRes = await fetch(directUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
redirect: 'follow',
})
if (directRes.ok) {
const arrayBuffer = await directRes.arrayBuffer()
buffer = Buffer.from(arrayBuffer)
mimeType =
directRes.headers.get('content-type') ||
contentTypeHint ||
'application/octet-stream'
} else {
continue
}
} else {
const metadata = (await metadataRes.json()) as Record<string, unknown>
const downloadUrl = metadata['@microsoft.graph.downloadUrl'] as
| string
| undefined
if (downloadUrl) {
const downloadRes = await fetchWithDNSPinning(
downloadUrl,
'',
'teams-onedrive-download'
)
if (downloadRes?.ok) {
const arrayBuffer = await downloadRes.arrayBuffer()
buffer = Buffer.from(arrayBuffer)
const fileInfo = metadata.file as Record<string, unknown> | undefined
mimeType =
downloadRes.headers.get('content-type') ||
(fileInfo?.mimeType as string | undefined) ||
contentTypeHint ||
'application/octet-stream'
if (metadata.name && metadata.name !== attachmentName) {
attachmentName = metadata.name as string
}
} else {
continue
}
} else {
continue
}
}
} catch {
continue
}
} else {
try {
const ares = await fetchWithDNSPinning(
contentUrl,
accessToken,
'teams-attachment-generic'
)
if (ares?.ok) {
const arrayBuffer = await ares.arrayBuffer()
buffer = Buffer.from(arrayBuffer)
mimeType =
ares.headers.get('content-type') ||
contentTypeHint ||
'application/octet-stream'
}
} catch {
continue
}
}
if (!buffer) continue
const size = buffer.length
rawAttachments.push({
name: attachmentName,
data: buffer,
contentType: mimeType,
size,
})
} catch {
/* skip attachment on error */
}
}
}
}
}
} catch (error) {
logger.error('Failed to fetch Teams message', {
error,
chatId: resolvedChatId,
messageId: resolvedMessageId,
})
}
}
if (!message) {
logger.warn('No message data available for Teams notification', {
chatId: resolvedChatId,
messageId: resolvedMessageId,
hasCredential: !!credentialId,
})
return {
message_id: resolvedMessageId,
chat_id: resolvedChatId,
from_name: '',
text: '',
created_at: '',
attachments: [],
}
}
const messageText = (message.body as Record<string, unknown>)?.content || ''
const from = ((message.from as Record<string, unknown>)?.user as Record<string, unknown>) || {}
const createdAt = (message.createdDateTime as string) || ''
return {
message_id: resolvedMessageId,
chat_id: resolvedChatId,
from_name: (from.displayName as string) || '',
text: messageText,
created_at: createdAt,
attachments: rawAttachments,
}
}
export const microsoftTeamsHandler: WebhookProviderHandler = {
handleChallenge(_body: unknown, request: NextRequest, requestId: string, path: string) {
const url = new URL(request.url)
const validationToken = url.searchParams.get('validationToken')
if (validationToken) {
logger.info(`[${requestId}] Microsoft Graph subscription validation for path: ${path}`)
return new NextResponse(validationToken, {
status: 200,
headers: { 'Content-Type': 'text/plain' },
})
}
return null
},
verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) {
if (providerConfig.hmacSecret) {
const authHeader = request.headers.get('authorization')
if (!authHeader || !authHeader.startsWith('HMAC ')) {
logger.warn(
`[${requestId}] Microsoft Teams outgoing webhook missing HMAC authorization header`
)
return new NextResponse('Unauthorized - Missing HMAC signature', { status: 401 })
}
if (
!validateMicrosoftTeamsSignature(providerConfig.hmacSecret as string, authHeader, rawBody)
) {
logger.warn(`[${requestId}] Microsoft Teams HMAC signature verification failed`)
return new NextResponse('Unauthorized - Invalid HMAC signature', { status: 401 })
}
}
return null
},
formatErrorResponse(error: string, status: number) {
return NextResponse.json({ type: 'message', text: error }, { status })
},
enrichHeaders({ body }: EventFilterContext, headers: Record<string, string>) {
const parsed = parseFirstNotification(body)
if (parsed) {
headers['x-teams-notification-id'] = `${parsed.subscriptionId}:${parsed.messageId}`
}
},
extractIdempotencyId(body: unknown) {
const parsed = parseFirstNotification(body)
return parsed ? `${parsed.subscriptionId}:${parsed.messageId}` : null
},
formatSuccessResponse(providerConfig: Record<string, unknown>) {
if (providerConfig.triggerId === 'microsoftteams_chat_subscription') {
return new NextResponse(null, { status: 202 })
}
return NextResponse.json({ type: 'message', text: 'Sim' })
},
formatQueueErrorResponse() {
return NextResponse.json(
{ type: 'message', text: 'Webhook processing failed' },
{ status: 500 }
)
},
async createSubscription({
webhook,
workflow,
userId,
requestId,
request,
}: SubscriptionContext): Promise<SubscriptionResult | undefined> {
const config = getProviderConfig(webhook)
if (config.triggerId !== 'microsoftteams_chat_subscription') {
return undefined
}
const credentialId = config.credentialId as string | undefined
const chatId = config.chatId as string | undefined
if (!credentialId) {
logger.warn(`[${requestId}] Missing credentialId for Teams chat subscription ${webhook.id}`)
throw new Error(
'Microsoft Teams credentials are required. Please connect your Microsoft account in the trigger configuration.'
)
}
if (!chatId) {
logger.warn(`[${requestId}] Missing chatId for Teams chat subscription ${webhook.id}`)
throw new Error(
'Chat ID is required to create a Teams subscription. Please provide a valid chat ID.'
)
}
const credentialOwner = await getCredentialOwner(credentialId, requestId)
const accessToken = credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
if (!accessToken) {
logger.error(`[${requestId}] Failed to get access token for Teams subscription ${webhook.id}`)
throw new Error(
'Failed to authenticate with Microsoft Teams. Please reconnect your Microsoft account and try again.'
)
}
const existingSubscriptionId = config.externalSubscriptionId as string | undefined
if (existingSubscriptionId) {
try {
const checkRes = await fetch(
`https://graph.microsoft.com/v1.0/subscriptions/${existingSubscriptionId}`,
{ method: 'GET', headers: { Authorization: `Bearer ${accessToken}` } }
)
if (checkRes.ok) {
logger.info(
`[${requestId}] Teams subscription ${existingSubscriptionId} already exists for webhook ${webhook.id}`
)
return { providerConfigUpdates: { externalSubscriptionId: existingSubscriptionId } }
}
} catch {
logger.debug(`[${requestId}] Existing subscription check failed, will create new one`)
}
}
const notificationUrl = getNotificationUrl(webhook)
const resource = `/chats/${chatId}/messages`
const maxLifetimeMinutes = 4230
const expirationDateTime = new Date(Date.now() + maxLifetimeMinutes * 60 * 1000).toISOString()
const body = {
changeType: 'created,updated',
notificationUrl,
lifecycleNotificationUrl: notificationUrl,
resource,
includeResourceData: false,
expirationDateTime,
clientState: webhook.id,
}
try {
const res = await fetch('https://graph.microsoft.com/v1.0/subscriptions', {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
})
const payload = await res.json()
if (!res.ok) {
const errorMessage =
payload.error?.message || payload.error?.code || 'Unknown Microsoft Graph API error'
logger.error(
`[${requestId}] Failed to create Teams subscription for webhook ${webhook.id}`,
{
status: res.status,
error: payload.error,
}
)
let userFriendlyMessage = 'Failed to create Teams subscription'
if (res.status === 401 || res.status === 403) {
userFriendlyMessage =
'Authentication failed. Please reconnect your Microsoft Teams account and ensure you have the necessary permissions.'
} else if (res.status === 404) {
userFriendlyMessage =
'Chat not found. Please verify that the Chat ID is correct and that you have access to the specified chat.'
} else if (errorMessage && errorMessage !== 'Unknown Microsoft Graph API error') {
userFriendlyMessage = `Teams error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
logger.info(
`[${requestId}] Successfully created Teams subscription ${payload.id} for webhook ${webhook.id}`
)
return { providerConfigUpdates: { externalSubscriptionId: payload.id as string } }
} catch (error: unknown) {
if (
error instanceof Error &&
(error.message.includes('credentials') ||
error.message.includes('Chat ID') ||
error.message.includes('authenticate'))
) {
throw error
}
logger.error(
`[${requestId}] Error creating Teams subscription for webhook ${webhook.id}`,
error
)
throw new Error(
error instanceof Error
? error.message
: 'Failed to create Teams subscription. Please try again.'
)
}
},
async deleteSubscription({
webhook,
workflow,
requestId,
}: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(webhook)
if (config.triggerId !== 'microsoftteams_chat_subscription') {
return
}
const externalSubscriptionId = config.externalSubscriptionId as string | undefined
const credentialId = config.credentialId as string | undefined
if (!externalSubscriptionId || !credentialId) {
logger.info(`[${requestId}] No external subscription to delete for webhook ${webhook.id}`)
return
}
const credentialOwner = await getCredentialOwner(credentialId, requestId)
const accessToken = credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
if (!accessToken) {
logger.warn(
`[${requestId}] Could not get access token to delete Teams subscription for webhook ${webhook.id}`
)
return
}
const res = await fetch(
`https://graph.microsoft.com/v1.0/subscriptions/${externalSubscriptionId}`,
{
method: 'DELETE',
headers: { Authorization: `Bearer ${accessToken}` },
}
)
if (res.ok || res.status === 404) {
logger.info(
`[${requestId}] Successfully deleted Teams subscription ${externalSubscriptionId} for webhook ${webhook.id}`
)
} else {
const errorBody = await res.text()
logger.warn(
`[${requestId}] Failed to delete Teams subscription ${externalSubscriptionId} for webhook ${webhook.id}. Status: ${res.status}`
)
}
} catch (error) {
logger.error(
`[${requestId}] Error deleting Teams subscription for webhook ${webhook.id}`,
error
)
}
},
async formatInput({
body,
webhook,
workflow,
headers,
requestId,
}: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const value = b?.value as unknown[] | undefined
if (value && Array.isArray(value) && value.length > 0) {
const mockRequest = {
headers: new Map(Object.entries(headers)),
} as unknown as import('next/server').NextRequest
const result = await formatTeamsGraphNotification(
b,
webhook,
workflow,
mockRequest as unknown as { headers: Map<string, string> }
)
return { input: result }
}
const messageText = (b?.text as string) || ''
const messageId = (b?.id as string) || ''
const timestamp = (b?.timestamp as string) || (b?.localTimestamp as string) || ''
const from = (b?.from || {}) as Record<string, unknown>
const conversation = (b?.conversation || {}) as Record<string, unknown>
return {
input: {
from: {
id: (from.id || '') as string,
name: (from.name || '') as string,
aadObjectId: (from.aadObjectId || '') as string,
},
message: {
raw: {
attachments: b?.attachments || [],
channelData: b?.channelData || {},
conversation: b?.conversation || {},
text: messageText,
messageType: (b?.type || 'message') as string,
channelId: (b?.channelId || '') as string,
timestamp,
},
},
activity: b || {},
conversation: {
id: (conversation.id || '') as string,
name: (conversation.name || '') as string,
isGroup: (conversation.isGroup || false) as boolean,
tenantId: (conversation.tenantId || '') as string,
aadObjectId: (conversation.aadObjectId || '') as string,
conversationType: (conversation.conversationType || '') as string,
},
},
}
},
}

View File

@@ -1,113 +0,0 @@
import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type {
FormatInputContext,
FormatInputResult,
PollingConfigContext,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { refreshAccessTokenIfNeeded, resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProvider:Outlook')
export const outlookHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
if (b && typeof b === 'object' && 'email' in b) {
return { input: { email: b.email, timestamp: b.timestamp } }
}
return { input: b }
},
async configurePolling({ webhook: webhookData, requestId }: PollingConfigContext) {
logger.info(`[${requestId}] Setting up Outlook polling for webhook ${webhookData.id}`)
try {
const providerConfig = (webhookData.providerConfig as Record<string, unknown>) || {}
const credentialId = providerConfig.credentialId as string | undefined
if (!credentialId) {
logger.error(`[${requestId}] Missing credentialId for Outlook webhook ${webhookData.id}`)
return false
}
const resolvedOutlook = await resolveOAuthAccountId(credentialId)
if (!resolvedOutlook) {
logger.error(
`[${requestId}] Could not resolve credential ${credentialId} for Outlook webhook ${webhookData.id}`
)
return false
}
const rows = await db
.select()
.from(account)
.where(eq(account.id, resolvedOutlook.accountId))
.limit(1)
if (rows.length === 0) {
logger.error(
`[${requestId}] Credential ${credentialId} not found for Outlook webhook ${webhookData.id}`
)
return false
}
const effectiveUserId = rows[0].userId
const accessToken = await refreshAccessTokenIfNeeded(
resolvedOutlook.accountId,
effectiveUserId,
requestId
)
if (!accessToken) {
logger.error(
`[${requestId}] Failed to refresh/access Outlook token for credential ${credentialId}`
)
return false
}
const now = new Date()
await db
.update(webhook)
.set({
providerConfig: {
...providerConfig,
userId: effectiveUserId,
credentialId,
maxEmailsPerPoll:
typeof providerConfig.maxEmailsPerPoll === 'string'
? Number.parseInt(providerConfig.maxEmailsPerPoll, 10) || 25
: (providerConfig.maxEmailsPerPoll as number) || 25,
pollingInterval:
typeof providerConfig.pollingInterval === 'string'
? Number.parseInt(providerConfig.pollingInterval, 10) || 5
: (providerConfig.pollingInterval as number) || 5,
markAsRead: providerConfig.markAsRead || false,
includeRawEmail: providerConfig.includeRawEmail || false,
folderIds: providerConfig.folderIds || ['inbox'],
folderFilterBehavior: providerConfig.folderFilterBehavior || 'INCLUDE',
lastCheckedTimestamp:
(providerConfig.lastCheckedTimestamp as string) || now.toISOString(),
setupCompleted: true,
},
updatedAt: now,
})
.where(eq(webhook.id, webhookData.id as string))
logger.info(
`[${requestId}] Successfully configured Outlook polling for webhook ${webhookData.id}`
)
return true
} catch (error: unknown) {
const err = error as Error
logger.error(`[${requestId}] Failed to configure Outlook polling`, {
webhookId: webhookData.id,
error: err.message,
stack: err.stack,
})
return false
}
},
}

View File

@@ -1,91 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { airtableHandler } from '@/lib/webhooks/providers/airtable'
import { ashbyHandler } from '@/lib/webhooks/providers/ashby'
import { attioHandler } from '@/lib/webhooks/providers/attio'
import { calcomHandler } from '@/lib/webhooks/providers/calcom'
import { calendlyHandler } from '@/lib/webhooks/providers/calendly'
import { circlebackHandler } from '@/lib/webhooks/providers/circleback'
import { confluenceHandler } from '@/lib/webhooks/providers/confluence'
import { fathomHandler } from '@/lib/webhooks/providers/fathom'
import { firefliesHandler } from '@/lib/webhooks/providers/fireflies'
import { genericHandler } from '@/lib/webhooks/providers/generic'
import { githubHandler } from '@/lib/webhooks/providers/github'
import { gmailHandler } from '@/lib/webhooks/providers/gmail'
import { googleFormsHandler } from '@/lib/webhooks/providers/google-forms'
import { grainHandler } from '@/lib/webhooks/providers/grain'
import { hubspotHandler } from '@/lib/webhooks/providers/hubspot'
import { imapHandler } from '@/lib/webhooks/providers/imap'
import { jiraHandler } from '@/lib/webhooks/providers/jira'
import { lemlistHandler } from '@/lib/webhooks/providers/lemlist'
import { linearHandler } from '@/lib/webhooks/providers/linear'
import { microsoftTeamsHandler } from '@/lib/webhooks/providers/microsoft-teams'
import { outlookHandler } from '@/lib/webhooks/providers/outlook'
import { rssHandler } from '@/lib/webhooks/providers/rss'
import { slackHandler } from '@/lib/webhooks/providers/slack'
import { stripeHandler } from '@/lib/webhooks/providers/stripe'
import { telegramHandler } from '@/lib/webhooks/providers/telegram'
import { twilioHandler } from '@/lib/webhooks/providers/twilio'
import { twilioVoiceHandler } from '@/lib/webhooks/providers/twilio-voice'
import { typeformHandler } from '@/lib/webhooks/providers/typeform'
import type { WebhookProviderHandler } from '@/lib/webhooks/providers/types'
import { verifyTokenAuth } from '@/lib/webhooks/providers/utils'
import { webflowHandler } from '@/lib/webhooks/providers/webflow'
import { whatsappHandler } from '@/lib/webhooks/providers/whatsapp'
const logger = createLogger('WebhookProviderRegistry')
const PROVIDER_HANDLERS: Record<string, WebhookProviderHandler> = {
airtable: airtableHandler,
ashby: ashbyHandler,
attio: attioHandler,
calendly: calendlyHandler,
calcom: calcomHandler,
circleback: circlebackHandler,
confluence: confluenceHandler,
fireflies: firefliesHandler,
generic: genericHandler,
gmail: gmailHandler,
github: githubHandler,
google_forms: googleFormsHandler,
fathom: fathomHandler,
grain: grainHandler,
hubspot: hubspotHandler,
imap: imapHandler,
jira: jiraHandler,
lemlist: lemlistHandler,
linear: linearHandler,
'microsoft-teams': microsoftTeamsHandler,
outlook: outlookHandler,
rss: rssHandler,
slack: slackHandler,
stripe: stripeHandler,
telegram: telegramHandler,
twilio: twilioHandler,
twilio_voice: twilioVoiceHandler,
typeform: typeformHandler,
webflow: webflowHandler,
whatsapp: whatsappHandler,
}
/**
* Default handler for unknown/future providers.
* Uses timing-safe comparison for bearer token validation.
*/
const defaultHandler: WebhookProviderHandler = {
verifyAuth({ request, requestId, providerConfig }) {
const token = providerConfig.token
if (typeof token === 'string') {
if (!verifyTokenAuth(request, token)) {
logger.warn(`[${requestId}] Unauthorized webhook access attempt - invalid token`)
return new NextResponse('Unauthorized', { status: 401 })
}
}
return null
},
}
/** Look up the provider handler, falling back to the default bearer token handler. */
export function getProviderHandler(provider: string): WebhookProviderHandler {
return PROVIDER_HANDLERS[provider] ?? defaultHandler
}

View File

@@ -1,65 +0,0 @@
import { db } from '@sim/db'
import { webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type {
FormatInputContext,
FormatInputResult,
PollingConfigContext,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Rss')
export const rssHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
if (b && typeof b === 'object' && 'item' in b) {
return {
input: {
title: b.title,
link: b.link,
pubDate: b.pubDate,
item: b.item,
feed: b.feed,
timestamp: b.timestamp,
},
}
}
return { input: b }
},
async configurePolling({ webhook: webhookData, requestId }: PollingConfigContext) {
logger.info(`[${requestId}] Setting up RSS polling for webhook ${webhookData.id}`)
try {
const providerConfig = (webhookData.providerConfig as Record<string, unknown>) || {}
const now = new Date()
await db
.update(webhook)
.set({
providerConfig: {
...providerConfig,
lastCheckedTimestamp: now.toISOString(),
lastSeenGuids: [],
setupCompleted: true,
},
updatedAt: now,
})
.where(eq(webhook.id, webhookData.id as string))
logger.info(
`[${requestId}] Successfully configured RSS polling for webhook ${webhookData.id}`
)
return true
} catch (error: unknown) {
const err = error as Error
logger.error(`[${requestId}] Failed to configure RSS polling`, {
webhookId: webhookData.id,
error: err.message,
})
return false
}
},
}

View File

@@ -1,282 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import {
secureFetchWithPinnedIP,
validateUrlWithDNS,
} from '@/lib/core/security/input-validation.server'
import type {
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Slack')
const SLACK_MAX_FILE_SIZE = 50 * 1024 * 1024 // 50 MB
const SLACK_MAX_FILES = 15
const SLACK_REACTION_EVENTS = new Set(['reaction_added', 'reaction_removed'])
async function resolveSlackFileInfo(
fileId: string,
botToken: string
): Promise<{ url_private?: string; name?: string; mimetype?: string; size?: number } | null> {
try {
const response = await fetch(
`https://slack.com/api/files.info?file=${encodeURIComponent(fileId)}`,
{ headers: { Authorization: `Bearer ${botToken}` } }
)
const data = (await response.json()) as {
ok: boolean
error?: string
file?: Record<string, unknown>
}
if (!data.ok || !data.file) {
logger.warn('Slack files.info failed', { fileId, error: data.error })
return null
}
return {
url_private: data.file.url_private as string | undefined,
name: data.file.name as string | undefined,
mimetype: data.file.mimetype as string | undefined,
size: data.file.size as number | undefined,
}
} catch (error) {
logger.error('Error calling Slack files.info', {
fileId,
error: error instanceof Error ? error.message : String(error),
})
return null
}
}
async function downloadSlackFiles(
rawFiles: unknown[],
botToken: string
): Promise<Array<{ name: string; data: string; mimeType: string; size: number }>> {
const filesToProcess = rawFiles.slice(0, SLACK_MAX_FILES)
const downloaded: Array<{ name: string; data: string; mimeType: string; size: number }> = []
for (const file of filesToProcess) {
const f = file as Record<string, unknown>
let urlPrivate = f.url_private as string | undefined
let fileName = f.name as string | undefined
let fileMimeType = f.mimetype as string | undefined
let fileSize = f.size as number | undefined
if (!urlPrivate && f.id) {
const resolved = await resolveSlackFileInfo(f.id as string, botToken)
if (resolved?.url_private) {
urlPrivate = resolved.url_private
fileName = fileName || resolved.name
fileMimeType = fileMimeType || resolved.mimetype
fileSize = fileSize ?? resolved.size
}
}
if (!urlPrivate) {
logger.warn('Slack file has no url_private and could not be resolved, skipping', {
fileId: f.id,
})
continue
}
const reportedSize = Number(fileSize) || 0
if (reportedSize > SLACK_MAX_FILE_SIZE) {
logger.warn('Slack file exceeds size limit, skipping', {
fileId: f.id,
size: reportedSize,
limit: SLACK_MAX_FILE_SIZE,
})
continue
}
try {
const urlValidation = await validateUrlWithDNS(urlPrivate, 'url_private')
if (!urlValidation.isValid) {
logger.warn('Slack file url_private failed DNS validation, skipping', {
fileId: f.id,
error: urlValidation.error,
})
continue
}
const response = await secureFetchWithPinnedIP(urlPrivate, urlValidation.resolvedIP!, {
headers: { Authorization: `Bearer ${botToken}` },
})
if (!response.ok) {
logger.warn('Failed to download Slack file, skipping', {
fileId: f.id,
status: response.status,
})
continue
}
const arrayBuffer = await response.arrayBuffer()
const buffer = Buffer.from(arrayBuffer)
if (buffer.length > SLACK_MAX_FILE_SIZE) {
logger.warn('Downloaded Slack file exceeds size limit, skipping', {
fileId: f.id,
actualSize: buffer.length,
limit: SLACK_MAX_FILE_SIZE,
})
continue
}
downloaded.push({
name: fileName || 'download',
data: buffer.toString('base64'),
mimeType: fileMimeType || 'application/octet-stream',
size: buffer.length,
})
} catch (error) {
logger.error('Error downloading Slack file, skipping', {
fileId: f.id,
error: error instanceof Error ? error.message : String(error),
})
}
}
return downloaded
}
async function fetchSlackMessageText(
channel: string,
messageTs: string,
botToken: string
): Promise<string> {
try {
const params = new URLSearchParams({ channel, timestamp: messageTs })
const response = await fetch(`https://slack.com/api/reactions.get?${params}`, {
headers: { Authorization: `Bearer ${botToken}` },
})
const data = (await response.json()) as {
ok: boolean
error?: string
type?: string
message?: { text?: string }
}
if (!data.ok) {
logger.warn('Slack reactions.get failed — message text unavailable', {
channel,
messageTs,
error: data.error,
})
return ''
}
return data.message?.text ?? ''
} catch (error) {
logger.warn('Error fetching Slack message text', {
channel,
messageTs,
error: error instanceof Error ? error.message : String(error),
})
return ''
}
}
/**
* Handle Slack verification challenges
*/
export function handleSlackChallenge(body: unknown): NextResponse | null {
const obj = body as Record<string, unknown>
if (obj.type === 'url_verification' && obj.challenge) {
return NextResponse.json({ challenge: obj.challenge })
}
return null
}
export const slackHandler: WebhookProviderHandler = {
handleChallenge(body: unknown) {
return handleSlackChallenge(body)
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
if (obj.event_id) {
return String(obj.event_id)
}
const event = obj.event as Record<string, unknown> | undefined
if (event?.ts && obj.team_id) {
return `${obj.team_id}:${event.ts}`
}
return null
},
formatSuccessResponse() {
return new NextResponse(null, { status: 200 })
},
formatQueueErrorResponse() {
return new NextResponse(null, { status: 200 })
},
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const botToken = providerConfig.botToken as string | undefined
const includeFiles = Boolean(providerConfig.includeFiles)
const rawEvent = b?.event as Record<string, unknown> | undefined
if (!rawEvent) {
logger.warn('Unknown Slack event type', {
type: b?.type,
hasEvent: false,
bodyKeys: Object.keys(b || {}),
})
}
const eventType: string = (rawEvent?.type as string) || (b?.type as string) || 'unknown'
const isReactionEvent = SLACK_REACTION_EVENTS.has(eventType)
const item = rawEvent?.item as Record<string, unknown> | undefined
const channel: string = isReactionEvent
? (item?.channel as string) || ''
: (rawEvent?.channel as string) || ''
const messageTs: string = isReactionEvent
? (item?.ts as string) || ''
: (rawEvent?.ts as string) || (rawEvent?.event_ts as string) || ''
let text: string = (rawEvent?.text as string) || ''
if (isReactionEvent && channel && messageTs && botToken) {
text = await fetchSlackMessageText(channel, messageTs, botToken)
}
const rawFiles: unknown[] = (rawEvent?.files as unknown[]) ?? []
const hasFiles = rawFiles.length > 0
let files: Array<{ name: string; data: string; mimeType: string; size: number }> = []
if (hasFiles && includeFiles && botToken) {
files = await downloadSlackFiles(rawFiles, botToken)
} else if (hasFiles && includeFiles && !botToken) {
logger.warn('Slack message has files and includeFiles is enabled, but no bot token provided')
}
return {
input: {
event: {
event_type: eventType,
channel,
channel_name: '',
user: (rawEvent?.user as string) || '',
user_name: '',
text,
timestamp: messageTs,
thread_ts: (rawEvent?.thread_ts as string) || '',
team_id: (b?.team_id as string) || (rawEvent?.team as string) || '',
event_id: (b?.event_id as string) || '',
reaction: (rawEvent?.reaction as string) || '',
item_user: (rawEvent?.item_user as string) || '',
hasFiles,
files,
},
},
}
},
}

View File

@@ -1,28 +0,0 @@
import { createLogger } from '@sim/logger'
import type {
EventFilterContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { skipByEventTypes } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Stripe')
export const stripeHandler: WebhookProviderHandler = {
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
return { input: body }
},
shouldSkipEvent(ctx: EventFilterContext) {
return skipByEventTypes(ctx, 'Stripe', logger)
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
if (obj.id && obj.object === 'event') {
return String(obj.id)
}
return null
},
}

View File

@@ -1,39 +0,0 @@
import { db } from '@sim/db'
import { account } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProviderSubscriptions')
export function getProviderConfig(webhook: Record<string, unknown>): Record<string, unknown> {
return (webhook.providerConfig as Record<string, unknown>) || {}
}
export function getNotificationUrl(webhook: Record<string, unknown>): string {
return `${getBaseUrl()}/api/webhooks/trigger/${webhook.path}`
}
export async function getCredentialOwner(
credentialId: string,
requestId: string
): Promise<{ userId: string; accountId: string } | null> {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
logger.warn(`[${requestId}] Failed to resolve OAuth account for credentialId ${credentialId}`)
return null
}
const [credentialRecord] = await db
.select({ userId: account.userId })
.from(account)
.where(eq(account.id, resolved.accountId))
.limit(1)
if (!credentialRecord?.userId) {
logger.warn(`[${requestId}] Credential owner not found for credentialId ${credentialId}`)
return null
}
return { userId: credentialRecord.userId, accountId: resolved.accountId }
}

View File

@@ -1,205 +0,0 @@
import { createLogger } from '@sim/logger'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
AuthContext,
DeleteSubscriptionContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:Telegram')
export const telegramHandler: WebhookProviderHandler = {
verifyAuth({ request, requestId }: AuthContext) {
const userAgent = request.headers.get('user-agent')
if (!userAgent) {
logger.warn(
`[${requestId}] Telegram webhook request has empty User-Agent header. This may be blocked by middleware.`
)
}
return null
},
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const rawMessage = (b?.message ||
b?.edited_message ||
b?.channel_post ||
b?.edited_channel_post) as Record<string, unknown> | undefined
const updateType = b.message
? 'message'
: b.edited_message
? 'edited_message'
: b.channel_post
? 'channel_post'
: b.edited_channel_post
? 'edited_channel_post'
: 'unknown'
if (rawMessage) {
const messageType = rawMessage.photo
? 'photo'
: rawMessage.document
? 'document'
: rawMessage.audio
? 'audio'
: rawMessage.video
? 'video'
: rawMessage.voice
? 'voice'
: rawMessage.sticker
? 'sticker'
: rawMessage.location
? 'location'
: rawMessage.contact
? 'contact'
: rawMessage.poll
? 'poll'
: 'text'
const from = rawMessage.from as Record<string, unknown> | undefined
return {
input: {
message: {
id: rawMessage.message_id,
text: rawMessage.text,
date: rawMessage.date,
messageType,
raw: rawMessage,
},
sender: from
? {
id: from.id,
username: from.username,
firstName: from.first_name,
lastName: from.last_name,
languageCode: from.language_code,
isBot: from.is_bot,
}
: null,
updateId: b.update_id,
updateType,
},
}
}
logger.warn('Unknown Telegram update type', {
updateId: b.update_id,
bodyKeys: Object.keys(b || {}),
})
return {
input: {
updateId: b.update_id,
updateType,
},
}
},
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
const config = getProviderConfig(ctx.webhook)
const botToken = config.botToken as string | undefined
if (!botToken) {
logger.warn(`[${ctx.requestId}] Missing botToken for Telegram webhook ${ctx.webhook.id}`)
throw new Error(
'Bot token is required to create a Telegram webhook. Please provide a valid Telegram bot token.'
)
}
const notificationUrl = getNotificationUrl(ctx.webhook)
const telegramApiUrl = `https://api.telegram.org/bot${botToken}/setWebhook`
try {
const telegramResponse = await fetch(telegramApiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'TelegramBot/1.0',
},
body: JSON.stringify({ url: notificationUrl }),
})
const responseBody = await telegramResponse.json()
if (!telegramResponse.ok || !responseBody.ok) {
const errorMessage =
responseBody.description ||
`Failed to create Telegram webhook. Status: ${telegramResponse.status}`
logger.error(`[${ctx.requestId}] ${errorMessage}`, { response: responseBody })
let userFriendlyMessage = 'Failed to create Telegram webhook'
if (telegramResponse.status === 401) {
userFriendlyMessage =
'Invalid bot token. Please verify that the bot token is correct and try again.'
} else if (responseBody.description) {
userFriendlyMessage = `Telegram error: ${responseBody.description}`
}
throw new Error(userFriendlyMessage)
}
logger.info(
`[${ctx.requestId}] Successfully created Telegram webhook for webhook ${ctx.webhook.id}`
)
return {}
} catch (error: unknown) {
if (
error instanceof Error &&
(error.message.includes('Bot token') || error.message.includes('Telegram error'))
) {
throw error
}
logger.error(
`[${ctx.requestId}] Error creating Telegram webhook for webhook ${ctx.webhook.id}`,
error
)
throw new Error(
error instanceof Error
? error.message
: 'Failed to create Telegram webhook. Please try again.'
)
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(ctx.webhook)
const botToken = config.botToken as string | undefined
if (!botToken) {
logger.warn(
`[${ctx.requestId}] Missing botToken for Telegram webhook deletion ${ctx.webhook.id}`
)
return
}
const telegramApiUrl = `https://api.telegram.org/bot${botToken}/deleteWebhook`
const telegramResponse = await fetch(telegramApiUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
})
const responseBody = await telegramResponse.json()
if (!telegramResponse.ok || !responseBody.ok) {
const errorMessage =
responseBody.description ||
`Failed to delete Telegram webhook. Status: ${telegramResponse.status}`
logger.error(`[${ctx.requestId}] ${errorMessage}`, { response: responseBody })
} else {
logger.info(
`[${ctx.requestId}] Successfully deleted Telegram webhook for webhook ${ctx.webhook.id}`
)
}
} catch (error) {
logger.error(
`[${ctx.requestId}] Error deleting Telegram webhook for webhook ${ctx.webhook.id}`,
error
)
}
},
}

View File

@@ -1,214 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { safeCompare } from '@/lib/core/security/encryption'
import type {
AuthContext,
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
const logger = createLogger('WebhookProvider:TwilioVoice')
async function validateTwilioSignature(
authToken: string,
signature: string,
url: string,
params: Record<string, unknown>
): Promise<boolean> {
try {
if (!authToken || !signature || !url) {
logger.warn('Twilio signature validation missing required fields', {
hasAuthToken: !!authToken,
hasSignature: !!signature,
hasUrl: !!url,
})
return false
}
const sortedKeys = Object.keys(params).sort()
let data = url
for (const key of sortedKeys) {
data += key + params[key]
}
logger.debug('Twilio signature validation string built', {
url,
sortedKeys,
dataLength: data.length,
})
const encoder = new TextEncoder()
const key = await crypto.subtle.importKey(
'raw',
encoder.encode(authToken),
{ name: 'HMAC', hash: 'SHA-1' },
false,
['sign']
)
const signatureBytes = await crypto.subtle.sign('HMAC', key, encoder.encode(data))
const signatureArray = Array.from(new Uint8Array(signatureBytes))
const signatureBase64 = btoa(String.fromCharCode(...signatureArray))
logger.debug('Twilio signature comparison', {
computedSignature: `${signatureBase64.substring(0, 10)}...`,
providedSignature: `${signature.substring(0, 10)}...`,
computedLength: signatureBase64.length,
providedLength: signature.length,
match: signatureBase64 === signature,
})
return safeCompare(signatureBase64, signature)
} catch (error) {
logger.error('Error validating Twilio signature:', error)
return false
}
}
function getExternalUrl(request: Request): string {
const proto = request.headers.get('x-forwarded-proto') || 'https'
const host = request.headers.get('x-forwarded-host') || request.headers.get('host')
if (host) {
const url = new URL(request.url)
const reconstructed = `${proto}://${host}${url.pathname}${url.search}`
return reconstructed
}
return request.url
}
export const twilioVoiceHandler: WebhookProviderHandler = {
async verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) {
const authToken = providerConfig.authToken as string | undefined
if (authToken) {
const signature = request.headers.get('x-twilio-signature')
if (!signature) {
logger.warn(`[${requestId}] Twilio Voice webhook missing signature header`)
return new NextResponse('Unauthorized - Missing Twilio signature', {
status: 401,
})
}
let params: Record<string, string> = {}
try {
if (typeof rawBody === 'string') {
const urlParams = new URLSearchParams(rawBody)
params = Object.fromEntries(urlParams.entries())
}
} catch (error) {
logger.error(
`[${requestId}] Error parsing Twilio webhook body for signature validation:`,
error
)
return new NextResponse('Bad Request - Invalid body format', {
status: 400,
})
}
const fullUrl = getExternalUrl(request)
const isValidSignature = await validateTwilioSignature(authToken, signature, fullUrl, params)
if (!isValidSignature) {
logger.warn(`[${requestId}] Twilio Voice signature verification failed`, {
url: fullUrl,
signatureLength: signature.length,
paramsCount: Object.keys(params).length,
authTokenLength: authToken.length,
})
return new NextResponse('Unauthorized - Invalid Twilio signature', {
status: 401,
})
}
}
return null
},
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
return (obj.MessageSid as string) || (obj.CallSid as string) || null
},
formatSuccessResponse(providerConfig: Record<string, unknown>) {
const twimlResponse = (providerConfig.twimlResponse as string | undefined)?.trim()
if (twimlResponse && twimlResponse.length > 0) {
const convertedTwiml = convertSquareBracketsToTwiML(twimlResponse)
return new NextResponse(convertedTwiml, {
status: 200,
headers: {
'Content-Type': 'text/xml; charset=utf-8',
},
})
}
const defaultTwiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>Your call is being processed.</Say>
<Pause length="1"/>
</Response>`
return new NextResponse(defaultTwiml, {
status: 200,
headers: {
'Content-Type': 'text/xml; charset=utf-8',
},
})
},
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
return {
input: {
callSid: b.CallSid,
accountSid: b.AccountSid,
from: b.From,
to: b.To,
callStatus: b.CallStatus,
direction: b.Direction,
apiVersion: b.ApiVersion,
callerName: b.CallerName,
forwardedFrom: b.ForwardedFrom,
digits: b.Digits,
speechResult: b.SpeechResult,
recordingUrl: b.RecordingUrl,
recordingSid: b.RecordingSid,
called: b.Called,
caller: b.Caller,
toCity: b.ToCity,
toState: b.ToState,
toZip: b.ToZip,
toCountry: b.ToCountry,
fromCity: b.FromCity,
fromState: b.FromState,
fromZip: b.FromZip,
fromCountry: b.FromCountry,
calledCity: b.CalledCity,
calledState: b.CalledState,
calledZip: b.CalledZip,
calledCountry: b.CalledCountry,
callerCity: b.CallerCity,
callerState: b.CallerState,
callerZip: b.CallerZip,
callerCountry: b.CallerCountry,
callToken: b.CallToken,
raw: JSON.stringify(b),
},
}
},
formatQueueErrorResponse() {
const errorTwiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>We're sorry, but an error occurred processing your call. Please try again later.</Say>
<Hangup/>
</Response>`
return new NextResponse(errorTwiml, {
status: 200,
headers: {
'Content-Type': 'text/xml',
},
})
},
}

View File

@@ -1,8 +0,0 @@
import type { WebhookProviderHandler } from '@/lib/webhooks/providers/types'
export const twilioHandler: WebhookProviderHandler = {
extractIdempotencyId(body: unknown) {
const obj = body as Record<string, unknown>
return (obj.MessageSid as string) || (obj.CallSid as string) || null
},
}

View File

@@ -1,213 +0,0 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@/lib/core/security/encryption'
import { getNotificationUrl, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { createHmacVerifier } from '@/lib/webhooks/providers/utils'
const logger = createLogger('WebhookProvider:Typeform')
function validateTypeformSignature(secret: string, signature: string, body: string): boolean {
try {
if (!secret || !signature || !body) {
return false
}
if (!signature.startsWith('sha256=')) {
return false
}
const providedSignature = signature.substring(7)
const computedHash = crypto.createHmac('sha256', secret).update(body, 'utf8').digest('base64')
return safeCompare(computedHash, providedSignature)
} catch (error) {
logger.error('Error validating Typeform signature:', error)
return false
}
}
export const typeformHandler: WebhookProviderHandler = {
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const formResponse = (b?.form_response || {}) as Record<string, unknown>
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const includeDefinition = providerConfig.includeDefinition === true
return {
input: {
event_id: b?.event_id || '',
event_type: b?.event_type || 'form_response',
form_id: formResponse.form_id || '',
token: formResponse.token || '',
submitted_at: formResponse.submitted_at || '',
landed_at: formResponse.landed_at || '',
calculated: formResponse.calculated || {},
variables: formResponse.variables || [],
hidden: formResponse.hidden || {},
answers: formResponse.answers || [],
...(includeDefinition ? { definition: formResponse.definition || {} } : {}),
ending: formResponse.ending || {},
raw: b,
},
}
},
verifyAuth: createHmacVerifier({
configKey: 'secret',
headerName: 'Typeform-Signature',
validateFn: validateTypeformSignature,
providerLabel: 'Typeform',
}),
async createSubscription(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined> {
const config = getProviderConfig(ctx.webhook)
const formId = config.formId as string | undefined
const apiKey = config.apiKey as string | undefined
const webhookTag = config.webhookTag as string | undefined
const secret = config.secret as string | undefined
if (!formId) {
logger.warn(`[${ctx.requestId}] Missing formId for Typeform webhook ${ctx.webhook.id}`)
throw new Error(
'Form ID is required to create a Typeform webhook. Please provide a valid form ID.'
)
}
if (!apiKey) {
logger.warn(`[${ctx.requestId}] Missing apiKey for Typeform webhook ${ctx.webhook.id}`)
throw new Error(
'Personal Access Token is required to create a Typeform webhook. Please provide your Typeform API key.'
)
}
const tag = webhookTag || `sim-${(ctx.webhook.id as string).substring(0, 8)}`
const notificationUrl = getNotificationUrl(ctx.webhook)
try {
const typeformApiUrl = `https://api.typeform.com/forms/${formId}/webhooks/${tag}`
const requestBody: Record<string, unknown> = {
url: notificationUrl,
enabled: true,
verify_ssl: true,
event_types: {
form_response: true,
},
}
if (secret) {
requestBody.secret = secret
}
const typeformResponse = await fetch(typeformApiUrl, {
method: 'PUT',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(requestBody),
})
if (!typeformResponse.ok) {
const responseBody = await typeformResponse.json().catch(() => ({}))
const errorMessage =
(responseBody as Record<string, string>).description ||
(responseBody as Record<string, string>).message ||
'Unknown error'
logger.error(`[${ctx.requestId}] Typeform API error: ${errorMessage}`, {
status: typeformResponse.status,
response: responseBody,
})
let userFriendlyMessage = 'Failed to create Typeform webhook'
if (typeformResponse.status === 401) {
userFriendlyMessage =
'Invalid Personal Access Token. Please verify your Typeform API key and try again.'
} else if (typeformResponse.status === 403) {
userFriendlyMessage =
'Access denied. Please ensure you have a Typeform PRO or PRO+ account and the API key has webhook permissions.'
} else if (typeformResponse.status === 404) {
userFriendlyMessage = 'Form not found. Please verify the form ID is correct.'
} else if (
(responseBody as Record<string, string>).description ||
(responseBody as Record<string, string>).message
) {
userFriendlyMessage = `Typeform error: ${errorMessage}`
}
throw new Error(userFriendlyMessage)
}
const responseBody = await typeformResponse.json()
logger.info(
`[${ctx.requestId}] Successfully created Typeform webhook for webhook ${ctx.webhook.id} with tag ${tag}`,
{ webhookId: (responseBody as Record<string, unknown>).id }
)
if (!webhookTag && tag) {
return { providerConfigUpdates: { webhookTag: tag } }
}
return {}
} catch (error: unknown) {
if (
error instanceof Error &&
(error.message.includes('Form ID') ||
error.message.includes('Personal Access Token') ||
error.message.includes('Typeform error'))
) {
throw error
}
logger.error(
`[${ctx.requestId}] Error creating Typeform webhook for webhook ${ctx.webhook.id}`,
error
)
throw new Error(
error instanceof Error
? error.message
: 'Failed to create Typeform webhook. Please try again.'
)
}
},
async deleteSubscription(ctx: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(ctx.webhook)
const formId = config.formId as string | undefined
const apiKey = config.apiKey as string | undefined
const webhookTag = config.webhookTag as string | undefined
if (!formId || !apiKey) {
logger.warn(
`[${ctx.requestId}] Missing formId or apiKey for Typeform webhook deletion ${ctx.webhook.id}, skipping cleanup`
)
return
}
const tag = webhookTag || `sim-${(ctx.webhook.id as string).substring(0, 8)}`
const typeformApiUrl = `https://api.typeform.com/forms/${formId}/webhooks/${tag}`
const typeformResponse = await fetch(typeformApiUrl, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${apiKey}`,
},
})
if (!typeformResponse.ok && typeformResponse.status !== 404) {
logger.warn(
`[${ctx.requestId}] Failed to delete Typeform webhook (non-fatal): ${typeformResponse.status}`
)
} else {
logger.info(`[${ctx.requestId}] Successfully deleted Typeform webhook with tag ${tag}`)
}
} catch (error) {
logger.warn(`[${ctx.requestId}] Error deleting Typeform webhook (non-fatal)`, error)
}
},
}

View File

@@ -1,143 +0,0 @@
import type { NextRequest, NextResponse } from 'next/server'
/** Context for signature/token verification. */
export interface AuthContext {
webhook: Record<string, unknown>
workflow: Record<string, unknown>
request: NextRequest
rawBody: string
requestId: string
providerConfig: Record<string, unknown>
}
/** Context for event matching against trigger configuration. */
export interface EventMatchContext {
webhook: Record<string, unknown>
workflow: Record<string, unknown>
body: unknown
request: NextRequest
requestId: string
providerConfig: Record<string, unknown>
}
/** Context for event filtering and header enrichment. */
export interface EventFilterContext {
webhook: Record<string, unknown>
body: unknown
requestId: string
providerConfig: Record<string, unknown>
}
/** Context for custom input preparation during execution. */
export interface FormatInputContext {
webhook: Record<string, unknown>
workflow: { id: string; userId: string }
body: unknown
headers: Record<string, string>
requestId: string
}
/** Result of custom input preparation. */
export interface FormatInputResult {
input: unknown
skip?: { message: string }
}
/** Context for provider-specific file processing before execution. */
export interface ProcessFilesContext {
input: Record<string, unknown>
blocks: Record<string, unknown>
blockId: string
workspaceId: string
workflowId: string
executionId: string
requestId: string
userId: string
}
/** Context for creating an external webhook subscription during deployment. */
export interface SubscriptionContext {
webhook: Record<string, unknown>
workflow: Record<string, unknown>
userId: string
requestId: string
request: NextRequest
}
/** Result of creating an external webhook subscription. */
export interface SubscriptionResult {
/** Fields to merge into providerConfig (externalId, webhookSecret, etc.) */
providerConfigUpdates?: Record<string, unknown>
}
/** Context for deleting an external webhook subscription during undeployment. */
export interface DeleteSubscriptionContext {
webhook: Record<string, unknown>
workflow: Record<string, unknown>
requestId: string
}
/** Context for configuring polling after webhook creation. */
export interface PollingConfigContext {
webhook: Record<string, unknown>
requestId: string
}
/**
* Strategy interface for provider-specific webhook behavior.
* Each provider implements only the methods it needs — all methods are optional.
*/
export interface WebhookProviderHandler {
/** Verify signature/auth. Return NextResponse(401/403) on failure, null on success. */
verifyAuth?(ctx: AuthContext): Promise<NextResponse | null> | NextResponse | null
/** Handle reachability/verification probes after webhook lookup. */
handleReachabilityTest?(body: unknown, requestId: string): NextResponse | null
/** Format error responses (some providers need special formats). */
formatErrorResponse?(error: string, status: number): NextResponse
/** Return true to skip this event (filtering by event type, collection, etc.). */
shouldSkipEvent?(ctx: EventFilterContext): boolean
/** Return true if event matches, false or NextResponse to skip with a custom response. */
matchEvent?(ctx: EventMatchContext): Promise<boolean | NextResponse> | boolean | NextResponse
/** Add provider-specific headers (idempotency keys, notification IDs, etc.). */
enrichHeaders?(ctx: EventFilterContext, headers: Record<string, string>): void
/** Extract unique identifier for idempotency dedup. */
extractIdempotencyId?(body: unknown): string | null
/** Custom success response after queuing. Return null for default `{message: "Webhook processed"}`. */
formatSuccessResponse?(providerConfig: Record<string, unknown>): NextResponse | null
/** Custom error response when queuing fails. Return null for default 500. */
formatQueueErrorResponse?(): NextResponse | null
/** Custom input preparation. Replaces the standard `formatWebhookInput` call when defined. */
formatInput?(ctx: FormatInputContext): Promise<FormatInputResult>
/** Called when standard `formatWebhookInput` returns null. Return skip message or null to proceed. */
handleEmptyInput?(requestId: string): { message: string } | null
/** Post-process input to handle file uploads before execution. */
processInputFiles?(ctx: ProcessFilesContext): Promise<void>
/** Create an external webhook subscription (e.g., register with Telegram, Airtable, etc.). */
createSubscription?(ctx: SubscriptionContext): Promise<SubscriptionResult | undefined>
/** Delete an external webhook subscription during cleanup. Errors should not throw. */
deleteSubscription?(ctx: DeleteSubscriptionContext): Promise<void>
/** Configure polling after webhook creation (gmail, outlook, rss, imap). */
configurePolling?(ctx: PollingConfigContext): Promise<boolean>
/** Handle verification challenges before webhook lookup (Slack url_verification, WhatsApp hub.verify_token, Teams validationToken). */
handleChallenge?(
body: unknown,
request: NextRequest,
requestId: string,
path: string
): Promise<NextResponse | null> | NextResponse | null
}

View File

@@ -1,102 +0,0 @@
import type { Logger } from '@sim/logger'
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { safeCompare } from '@/lib/core/security/encryption'
import type { AuthContext, EventFilterContext } from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProviderAuth')
interface HmacVerifierOptions {
configKey: string
headerName: string
validateFn: (secret: string, signature: string, rawBody: string) => boolean | Promise<boolean>
providerLabel: string
}
/**
* Factory that creates a `verifyAuth` implementation for HMAC-signature-based providers.
* Covers the common pattern: get secret → check header → validate signature → return 401 or null.
*/
export function createHmacVerifier({
configKey,
headerName,
validateFn,
providerLabel,
}: HmacVerifierOptions) {
return async ({
request,
rawBody,
requestId,
providerConfig,
}: AuthContext): Promise<NextResponse | null> => {
const secret = providerConfig[configKey] as string | undefined
if (!secret) {
return null
}
const signature = request.headers.get(headerName)
if (!signature) {
logger.warn(`[${requestId}] ${providerLabel} webhook missing signature header`)
return new NextResponse(`Unauthorized - Missing ${providerLabel} signature`, { status: 401 })
}
const isValid = await validateFn(secret, signature, rawBody)
if (!isValid) {
logger.warn(`[${requestId}] ${providerLabel} signature verification failed`, {
signatureLength: signature.length,
secretLength: secret.length,
})
return new NextResponse(`Unauthorized - Invalid ${providerLabel} signature`, { status: 401 })
}
return null
}
}
/**
* Verify a bearer token or custom header token using timing-safe comparison.
* Used by generic webhooks, Google Forms, and the default handler.
*/
export function verifyTokenAuth(
request: Request,
expectedToken: string,
secretHeaderName?: string
): boolean {
if (secretHeaderName) {
const headerValue = request.headers.get(secretHeaderName.toLowerCase())
return !!headerValue && safeCompare(headerValue, expectedToken)
}
const authHeader = request.headers.get('authorization')
if (authHeader?.toLowerCase().startsWith('bearer ')) {
const token = authHeader.substring(7)
return safeCompare(token, expectedToken)
}
return false
}
/**
* Skip events whose `body.type` is not in the `providerConfig.eventTypes` allowlist.
* Shared by providers that use a simple event-type filter (Stripe, Grain, etc.).
*/
export function skipByEventTypes(
{ webhook, body, requestId, providerConfig }: EventFilterContext,
providerLabel: string,
eventLogger: Logger
): boolean {
const eventTypes = providerConfig.eventTypes
if (!eventTypes || !Array.isArray(eventTypes) || eventTypes.length === 0) {
return false
}
const eventType = (body as Record<string, unknown>)?.type as string | undefined
if (eventType && !eventTypes.includes(eventType)) {
eventLogger.info(
`[${requestId}] ${providerLabel} event type '${eventType}' not in allowed list for webhook ${webhook.id as string}, skipping`
)
return true
}
return false
}

View File

@@ -1,307 +0,0 @@
import { createLogger } from '@sim/logger'
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { getCredentialOwner, getProviderConfig } from '@/lib/webhooks/providers/subscription-utils'
import type {
DeleteSubscriptionContext,
EventFilterContext,
FormatInputContext,
FormatInputResult,
SubscriptionContext,
SubscriptionResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
import { getOAuthToken, refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
const logger = createLogger('WebhookProvider:Webflow')
export const webflowHandler: WebhookProviderHandler = {
async createSubscription({
webhook: webhookRecord,
workflow,
userId,
requestId,
}: SubscriptionContext): Promise<SubscriptionResult | undefined> {
try {
const { path, providerConfig } = webhookRecord as Record<string, unknown>
const config = (providerConfig as Record<string, unknown>) || {}
const { siteId, triggerId, collectionId, formName, credentialId } = config as {
siteId?: string
triggerId?: string
collectionId?: string
formName?: string
credentialId?: string
}
if (!siteId) {
logger.warn(`[${requestId}] Missing siteId for Webflow webhook creation.`, {
webhookId: webhookRecord.id,
})
throw new Error('Site ID is required to create Webflow webhook')
}
const siteIdValidation = validateAlphanumericId(siteId, 'siteId', 100)
if (!siteIdValidation.isValid) {
throw new Error(siteIdValidation.error)
}
if (!triggerId) {
logger.warn(`[${requestId}] Missing triggerId for Webflow webhook creation.`, {
webhookId: webhookRecord.id,
})
throw new Error('Trigger type is required to create Webflow webhook')
}
const credentialOwner = credentialId
? await getCredentialOwner(credentialId, requestId)
: null
const accessToken = credentialId
? credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
: await getOAuthToken(userId, 'webflow')
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Webflow access token for user ${userId}. Cannot create webhook in Webflow.`
)
throw new Error(
'Webflow account connection required. Please connect your Webflow account in the trigger configuration and try again.'
)
}
const notificationUrl = `${getBaseUrl()}/api/webhooks/trigger/${path}`
const triggerTypeMap: Record<string, string> = {
webflow_collection_item_created: 'collection_item_created',
webflow_collection_item_changed: 'collection_item_changed',
webflow_collection_item_deleted: 'collection_item_deleted',
webflow_form_submission: 'form_submission',
}
const webflowTriggerType = triggerTypeMap[triggerId]
if (!webflowTriggerType) {
logger.warn(`[${requestId}] Invalid triggerId for Webflow: ${triggerId}`, {
webhookId: webhookRecord.id,
})
throw new Error(`Invalid Webflow trigger type: ${triggerId}`)
}
const webflowApiUrl = `https://api.webflow.com/v2/sites/${siteId}/webhooks`
const requestBody: Record<string, unknown> = {
triggerType: webflowTriggerType,
url: notificationUrl,
}
if (formName && webflowTriggerType === 'form_submission') {
requestBody.filter = {
name: formName,
}
}
const webflowResponse = await fetch(webflowApiUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
accept: 'application/json',
},
body: JSON.stringify(requestBody),
})
const responseBody = await webflowResponse.json()
if (!webflowResponse.ok || responseBody.error) {
const errorMessage =
responseBody.message || responseBody.error || 'Unknown Webflow API error'
logger.error(
`[${requestId}] Failed to create webhook in Webflow for webhook ${webhookRecord.id}. Status: ${webflowResponse.status}`,
{ message: errorMessage, response: responseBody }
)
throw new Error(errorMessage)
}
logger.info(
`[${requestId}] Successfully created webhook in Webflow for webhook ${webhookRecord.id}.`,
{
webflowWebhookId: responseBody.id || responseBody._id,
}
)
return { providerConfigUpdates: { externalId: responseBody.id || responseBody._id } }
} catch (error: unknown) {
const err = error as Error
logger.error(
`[${requestId}] Exception during Webflow webhook creation for webhook ${webhookRecord.id}.`,
{
message: err.message,
stack: err.stack,
}
)
throw error
}
},
async deleteSubscription({
webhook: webhookRecord,
workflow,
requestId,
}: DeleteSubscriptionContext): Promise<void> {
try {
const config = getProviderConfig(webhookRecord)
const siteId = config.siteId as string | undefined
const externalId = config.externalId as string | undefined
if (!siteId) {
logger.warn(
`[${requestId}] Missing siteId for Webflow webhook deletion ${webhookRecord.id}, skipping cleanup`
)
return
}
if (!externalId) {
logger.warn(
`[${requestId}] Missing externalId for Webflow webhook deletion ${webhookRecord.id}, skipping cleanup`
)
return
}
const siteIdValidation = validateAlphanumericId(siteId, 'siteId', 100)
if (!siteIdValidation.isValid) {
logger.warn(`[${requestId}] Invalid Webflow site ID format, skipping deletion`, {
webhookId: webhookRecord.id,
siteId: siteId.substring(0, 30),
})
return
}
const webhookIdValidation = validateAlphanumericId(externalId, 'webhookId', 100)
if (!webhookIdValidation.isValid) {
logger.warn(`[${requestId}] Invalid Webflow webhook ID format, skipping deletion`, {
webhookId: webhookRecord.id,
externalId: externalId.substring(0, 30),
})
return
}
const credentialId = config.credentialId as string | undefined
if (!credentialId) {
logger.warn(
`[${requestId}] Missing credentialId for Webflow webhook deletion ${webhookRecord.id}`
)
return
}
const credentialOwner = await getCredentialOwner(credentialId, requestId)
const accessToken = credentialOwner
? await refreshAccessTokenIfNeeded(
credentialOwner.accountId,
credentialOwner.userId,
requestId
)
: null
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Webflow access token. Cannot delete webhook.`,
{ webhookId: webhookRecord.id }
)
return
}
const webflowApiUrl = `https://api.webflow.com/v2/sites/${siteId}/webhooks/${externalId}`
const webflowResponse = await fetch(webflowApiUrl, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${accessToken}`,
accept: 'application/json',
},
})
if (!webflowResponse.ok && webflowResponse.status !== 404) {
const responseBody = await webflowResponse.json().catch(() => ({}))
logger.warn(
`[${requestId}] Failed to delete Webflow webhook (non-fatal): ${webflowResponse.status}`,
{ response: responseBody }
)
} else {
logger.info(`[${requestId}] Successfully deleted Webflow webhook ${externalId}`)
}
} catch (error) {
logger.warn(`[${requestId}] Error deleting Webflow webhook (non-fatal)`, error)
}
},
async formatInput({ body, webhook }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const providerConfig = (webhook.providerConfig as Record<string, unknown>) || {}
const triggerId = providerConfig.triggerId as string | undefined
if (triggerId === 'webflow_form_submission') {
return {
input: {
siteId: b?.siteId || '',
formId: b?.formId || '',
name: b?.name || '',
id: b?.id || '',
submittedAt: b?.submittedAt || '',
data: b?.data || {},
schema: b?.schema || {},
formElementId: b?.formElementId || '',
},
}
}
const { _cid, _id, ...itemFields } = b || ({} as Record<string, unknown>)
return {
input: {
siteId: b?.siteId || '',
collectionId: (_cid || b?.collectionId || '') as string,
payload: {
id: (_id || '') as string,
cmsLocaleId: (itemFields as Record<string, unknown>)?.cmsLocaleId || '',
lastPublished:
(itemFields as Record<string, unknown>)?.lastPublished ||
(itemFields as Record<string, unknown>)?.['last-published'] ||
'',
lastUpdated:
(itemFields as Record<string, unknown>)?.lastUpdated ||
(itemFields as Record<string, unknown>)?.['last-updated'] ||
'',
createdOn:
(itemFields as Record<string, unknown>)?.createdOn ||
(itemFields as Record<string, unknown>)?.['created-on'] ||
'',
isArchived:
(itemFields as Record<string, unknown>)?.isArchived ||
(itemFields as Record<string, unknown>)?._archived ||
false,
isDraft:
(itemFields as Record<string, unknown>)?.isDraft ||
(itemFields as Record<string, unknown>)?._draft ||
false,
fieldData: itemFields,
},
},
}
},
shouldSkipEvent({ webhook, body, requestId, providerConfig }: EventFilterContext) {
const configuredCollectionId = providerConfig.collectionId as string | undefined
if (configuredCollectionId) {
const obj = body as Record<string, unknown>
const payload = obj.payload as Record<string, unknown> | undefined
const payloadCollectionId = (payload?.collectionId ?? obj.collectionId) as string | undefined
if (payloadCollectionId && payloadCollectionId !== configuredCollectionId) {
logger.info(
`[${requestId}] Webflow collection '${payloadCollectionId}' doesn't match configured collection '${configuredCollectionId}' for webhook ${webhook.id as string}, skipping`
)
return true
}
}
return false
},
}

View File

@@ -1,118 +0,0 @@
import { db, workflowDeploymentVersion } from '@sim/db'
import { webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import type {
FormatInputContext,
FormatInputResult,
WebhookProviderHandler,
} from '@/lib/webhooks/providers/types'
const logger = createLogger('WebhookProvider:WhatsApp')
/**
* Handle WhatsApp verification requests
*/
export async function handleWhatsAppVerification(
requestId: string,
path: string,
mode: string | null,
token: string | null,
challenge: string | null
): Promise<NextResponse | null> {
if (mode && token && challenge) {
logger.info(`[${requestId}] WhatsApp verification request received for path: ${path}`)
if (mode !== 'subscribe') {
logger.warn(`[${requestId}] Invalid WhatsApp verification mode: ${mode}`)
return new NextResponse('Invalid mode', { status: 400 })
}
const webhooks = await db
.select({ webhook })
.from(webhook)
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, webhook.workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'whatsapp'),
eq(webhook.isActive, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
for (const row of webhooks) {
const wh = row.webhook
const providerConfig = (wh.providerConfig as Record<string, unknown>) || {}
const verificationToken = providerConfig.verificationToken
if (!verificationToken) {
continue
}
if (token === verificationToken) {
logger.info(`[${requestId}] WhatsApp verification successful for webhook ${wh.id}`)
return new NextResponse(challenge, {
status: 200,
headers: {
'Content-Type': 'text/plain',
},
})
}
}
logger.warn(`[${requestId}] No matching WhatsApp verification token found`)
return new NextResponse('Verification failed', { status: 403 })
}
return null
}
export const whatsappHandler: WebhookProviderHandler = {
async handleChallenge(_body: unknown, request: NextRequest, requestId: string, path: string) {
const url = new URL(request.url)
const mode = url.searchParams.get('hub.mode')
const token = url.searchParams.get('hub.verify_token')
const challenge = url.searchParams.get('hub.challenge')
return handleWhatsAppVerification(requestId, path, mode, token, challenge)
},
async formatInput({ body }: FormatInputContext): Promise<FormatInputResult> {
const b = body as Record<string, unknown>
const entry = b?.entry as Array<Record<string, unknown>> | undefined
const changes = entry?.[0]?.changes as Array<Record<string, unknown>> | undefined
const data = changes?.[0]?.value as Record<string, unknown> | undefined
const messages = (data?.messages as Array<Record<string, unknown>>) || []
if (messages.length > 0) {
const message = messages[0]
const metadata = data?.metadata as Record<string, unknown> | undefined
const text = message.text as Record<string, unknown> | undefined
return {
input: {
messageId: message.id,
from: message.from,
phoneNumberId: metadata?.phone_number_id,
text: text?.body,
timestamp: message.timestamp,
raw: JSON.stringify(message),
},
}
}
return { input: null }
},
handleEmptyInput(requestId: string) {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
return { message: 'No messages in WhatsApp payload' }
},
}

File diff suppressed because it is too large Load Diff