fix(generic-webhooks): idempotency simplification, generic webhook vars changes (#1384)

* fix(idempotency): simplify for deterministic provider based checks

* remove generic webhook outputs and allow body to be referenced via vars
This commit is contained in:
Vikhyath Mondreti
2025-09-19 10:03:45 -07:00
committed by GitHub
parent 8e70a61ba9
commit 04922fe5c9
8 changed files with 54 additions and 187 deletions

View File

@@ -1,6 +1,6 @@
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { cleanupExpiredIdempotencyKeys, getIdempotencyKeyStats } from '@/lib/idempotency/cleanup'
import { cleanupExpiredIdempotencyKeys, getIdempotencyKeyStats } from '@/lib/idempotency'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'

View File

@@ -375,39 +375,40 @@ export async function POST(
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
foundWebhook.id,
body,
Object.fromEntries(request.headers.entries())
)
const runOperation = async () => {
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
return {
method: 'trigger.dev',
taskId: handle.id,
status: 'queued',
}
}
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
return {
method: 'direct',
status: 'queued',
}
}
const result = await webhookIdempotency.executeWithIdempotency(
foundWebhook.provider,
idempotencyKey,
async () => {
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
return {
method: 'trigger.dev',
taskId: handle.id,
status: 'queued',
}
}
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
return {
method: 'direct',
status: 'queued',
}
}
runOperation
)
logger.debug(`[${requestId}] Webhook execution result:`, result)

View File

@@ -44,16 +44,17 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
payload.webhookId,
payload.body,
payload.headers
)
const runOperation = async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}
return await webhookIdempotency.executeWithIdempotency(
payload.provider,
idempotencyKey,
async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}
runOperation
)
}

View File

@@ -27,18 +27,7 @@ export const GenericWebhookBlock: BlockConfig = {
inputs: {}, // No inputs - webhook triggers receive data externally
outputs: {
// Generic webhook outputs that can be used with any webhook payload
payload: { type: 'json', description: 'Complete webhook payload' },
headers: { type: 'json', description: 'Request headers' },
method: { type: 'string', description: 'HTTP method' },
url: { type: 'string', description: 'Request URL' },
timestamp: { type: 'string', description: 'Webhook received timestamp' },
// Common webhook fields that services often use
event: { type: 'string', description: 'Event type from payload' },
id: { type: 'string', description: 'Event ID from payload' },
data: { type: 'json', description: 'Event data from payload' },
},
outputs: {},
triggers: {
enabled: true,

View File

@@ -2,6 +2,5 @@ export * from './cleanup'
export * from './service'
export {
pollingIdempotency,
triggerIdempotency,
webhookIdempotency,
} from './service'

View File

@@ -1,4 +1,4 @@
import * as crypto from 'crypto'
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { idempotencyKey } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
@@ -451,110 +451,26 @@ export class IdempotencyService {
/**
* Create an idempotency key from a webhook payload following RFC best practices
* Priority order:
* 1. Standard webhook headers (webhook-id, x-webhook-id, etc.)
* 2. Event/message IDs from payload
* 3. Deterministic hash of stable payload fields (excluding timestamps)
* Standard webhook headers (webhook-id, x-webhook-id, etc.)
*/
static createWebhookIdempotencyKey(
webhookId: string,
payload: any,
headers?: Record<string, string>
): string {
// 1. Check for standard webhook headers (RFC compliant)
static createWebhookIdempotencyKey(webhookId: string, headers?: Record<string, string>): string {
const normalizedHeaders = headers
? Object.fromEntries(Object.entries(headers).map(([k, v]) => [k.toLowerCase(), v]))
: undefined
const webhookIdHeader =
headers?.['webhook-id'] || // Standard Webhooks spec
headers?.['x-webhook-id'] || // Legacy standard
headers?.['x-shopify-webhook-id'] ||
headers?.['x-github-delivery'] ||
headers?.['x-event-id'] // Generic event ID header
normalizedHeaders?.['webhook-id'] ||
normalizedHeaders?.['x-webhook-id'] ||
normalizedHeaders?.['x-shopify-webhook-id'] ||
normalizedHeaders?.['x-github-delivery'] ||
normalizedHeaders?.['x-event-id']
if (webhookIdHeader) {
return `${webhookId}:${webhookIdHeader}`
}
// 2. Extract event/message IDs from payload (most reliable)
const payloadId =
payload?.id ||
payload?.event_id ||
payload?.eventId ||
payload?.message?.id ||
payload?.data?.id ||
payload?.object?.id ||
payload?.event?.id
if (payloadId) {
return `${webhookId}:${payloadId}`
}
// 3. Create deterministic hash from stable payload fields (excluding timestamps)
const stablePayload = IdempotencyService.createStablePayloadForHashing(payload)
const payloadHash = crypto
.createHash('sha256')
.update(JSON.stringify(stablePayload))
.digest('hex')
.substring(0, 16)
return `${webhookId}:${payloadHash}`
}
/**
* Create a stable representation of the payload for hashing by removing
* timestamp and other volatile fields that change between requests
*/
private static createStablePayloadForHashing(payload: any): any {
if (!payload || typeof payload !== 'object') {
return payload
}
const volatileFields = [
'timestamp',
'created_at',
'updated_at',
'sent_at',
'received_at',
'processed_at',
'delivered_at',
'attempt',
'retry_count',
'request_id',
'trace_id',
'span_id',
'delivery_id',
'webhook_timestamp',
]
const cleanPayload = { ...payload }
const removeVolatileFields = (obj: any): any => {
if (!obj || typeof obj !== 'object') return obj
if (Array.isArray(obj)) {
return obj.map(removeVolatileFields)
}
const cleaned: any = {}
for (const [key, value] of Object.entries(obj)) {
const lowerKey = key.toLowerCase()
if (volatileFields.some((field) => lowerKey.includes(field))) {
continue
}
if (typeof value === 'string' && /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/.test(value)) {
continue
}
if (typeof value === 'number' && value > 1000000000 && value < 9999999999) {
continue
}
cleaned[key] = removeVolatileFields(value)
}
return cleaned
}
return removeVolatileFields(cleanPayload)
const uniqueId = randomUUID()
return `${webhookId}:${uniqueId}`
}
}
@@ -567,8 +483,3 @@ export const pollingIdempotency = new IdempotencyService({
namespace: 'polling',
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
})
export const triggerIdempotency = new IdempotencyService({
namespace: 'trigger',
ttlSeconds: 60 * 60 * 24 * 1, // 1 day
})

View File

@@ -512,6 +512,10 @@ export function formatWebhookInput(
}
}
if (foundWebhook.provider === 'generic') {
return body
}
if (foundWebhook.provider === 'google_forms') {
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}

View File

@@ -34,45 +34,7 @@ export const genericWebhookTrigger: TriggerConfig = {
},
},
outputs: {
payload: {
type: 'json',
description: 'Complete webhook payload received',
},
headers: {
type: 'json',
description: 'HTTP request headers',
},
method: {
type: 'string',
description: 'HTTP method (GET, POST, PUT, etc.)',
},
url: {
type: 'string',
description: 'Request URL path',
},
query: {
type: 'json',
description: 'URL query parameters',
},
timestamp: {
type: 'string',
description: 'Webhook received timestamp',
},
// Common fields that many services use
event: {
type: 'string',
description: 'Event type (extracted from payload.event, payload.type, or payload.event_type)',
},
id: {
type: 'string',
description: 'Event ID (extracted from payload.id, payload.event_id, or payload.uuid)',
},
data: {
type: 'json',
description: 'Event data (extracted from payload.data or the full payload)',
},
},
outputs: {},
instructions: [
'Copy the webhook URL provided above and use it in your external service or API.',