fix(webhook-payloads): fixed the variable resolution in webhooks (#1019)

* telegram webhook fix

* changed payloads

* test

* test

* test

* test

* fix github dropdown

* test

* reverted github changes

* fixed github var

* test

* bun run lint

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test

* test push

* test

* bun run lint

* edited airtable payload and webhook deletion

* Revert bun.lock and package.json to upstream/staging

* cleaned up

* test

* test

* resolving more cmments

* resolved comments, updated trigger

* cleaned up, resolved comments

* test

* test

* lint

---------

Co-authored-by: Adam Gough <adamgough@Mac.attlocal.net>
This commit is contained in:
Adam Gough
2025-08-21 20:03:04 -07:00
committed by GitHub
parent 9ea9f2d52e
commit 9dbd44e555
7 changed files with 315 additions and 143 deletions

View File

@@ -1,8 +1,10 @@
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { getOAuthToken } from '@/app/api/auth/oauth/utils'
import { db } from '@/db'
import { webhook, workflow } from '@/db/schema'
@@ -242,6 +244,167 @@ export async function DELETE(
const foundWebhook = webhookData.webhook
// If it's an Airtable webhook, delete it from Airtable first
if (foundWebhook.provider === 'airtable') {
try {
const { baseId, externalId } = (foundWebhook.providerConfig || {}) as {
baseId?: string
externalId?: string
}
if (!baseId) {
logger.warn(`[${requestId}] Missing baseId for Airtable webhook deletion.`, {
webhookId: id,
})
return NextResponse.json(
{ error: 'Missing baseId for Airtable webhook deletion' },
{ status: 400 }
)
}
// Get access token for the workflow owner
const userIdForToken = webhookData.workflow.userId
const accessToken = await getOAuthToken(userIdForToken, 'airtable')
if (!accessToken) {
logger.warn(
`[${requestId}] Could not retrieve Airtable access token for user ${userIdForToken}. Cannot delete webhook in Airtable.`,
{ webhookId: id }
)
return NextResponse.json(
{ error: 'Airtable access token not found for webhook deletion' },
{ status: 401 }
)
}
// Resolve externalId if missing by listing webhooks and matching our notificationUrl
let resolvedExternalId: string | undefined = externalId
if (!resolvedExternalId) {
try {
const requestOrigin = new URL(request.url).origin
const effectiveOrigin = requestOrigin.includes('localhost')
? env.NEXT_PUBLIC_APP_URL || requestOrigin
: requestOrigin
const expectedNotificationUrl = `${effectiveOrigin}/api/webhooks/trigger/${foundWebhook.path}`
const listUrl = `https://api.airtable.com/v0/bases/${baseId}/webhooks`
const listResp = await fetch(listUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
const listBody = await listResp.json().catch(() => null)
if (listResp.ok && listBody && Array.isArray(listBody.webhooks)) {
const match = listBody.webhooks.find((w: any) => {
const url: string | undefined = w?.notificationUrl
if (!url) return false
// Prefer exact match; fallback to suffix match to handle origin/host remaps
return (
url === expectedNotificationUrl ||
url.endsWith(`/api/webhooks/trigger/${foundWebhook.path}`)
)
})
if (match?.id) {
resolvedExternalId = match.id as string
// Persist resolved externalId for future operations
try {
await db
.update(webhook)
.set({
providerConfig: {
...(foundWebhook.providerConfig || {}),
externalId: resolvedExternalId,
},
updatedAt: new Date(),
})
.where(eq(webhook.id, id))
} catch {
// non-fatal persistence error
}
logger.info(`[${requestId}] Resolved Airtable externalId by listing webhooks`, {
baseId,
externalId: resolvedExternalId,
})
} else {
logger.warn(`[${requestId}] Could not resolve Airtable externalId from list`, {
baseId,
expectedNotificationUrl,
})
}
} else {
logger.warn(`[${requestId}] Failed to list Airtable webhooks to resolve externalId`, {
baseId,
status: listResp.status,
body: listBody,
})
}
} catch (e: any) {
logger.warn(`[${requestId}] Error attempting to resolve Airtable externalId`, {
error: e?.message,
})
}
}
// If still not resolvable, skip remote deletion but proceed with local delete
if (!resolvedExternalId) {
logger.info(
`[${requestId}] Airtable externalId not found; skipping remote deletion and proceeding to remove local record`,
{ baseId }
)
}
if (resolvedExternalId) {
const airtableDeleteUrl = `https://api.airtable.com/v0/bases/${baseId}/webhooks/${resolvedExternalId}`
const airtableResponse = await fetch(airtableDeleteUrl, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
// Attempt to parse error body for better diagnostics
if (!airtableResponse.ok) {
let responseBody: any = null
try {
responseBody = await airtableResponse.json()
} catch {
// ignore parse errors
}
logger.error(
`[${requestId}] Failed to delete Airtable webhook in Airtable. Status: ${airtableResponse.status}`,
{ baseId, externalId: resolvedExternalId, response: responseBody }
)
return NextResponse.json(
{
error: 'Failed to delete webhook from Airtable',
details:
(responseBody && (responseBody.error?.message || responseBody.error)) ||
`Status ${airtableResponse.status}`,
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Successfully deleted Airtable webhook in Airtable`, {
baseId,
externalId: resolvedExternalId,
})
}
} catch (error: any) {
logger.error(`[${requestId}] Error deleting Airtable webhook`, {
webhookId: id,
error: error.message,
stack: error.stack,
})
return NextResponse.json(
{ error: 'Failed to delete webhook from Airtable', details: error.message },
{ status: 500 }
)
}
}
// If it's a Telegram webhook, delete it from Telegram first
if (foundWebhook.provider === 'telegram') {
try {

View File

@@ -27,6 +27,13 @@ export class TriggerBlockHandler implements BlockHandler {
): Promise<any> {
logger.info(`Executing trigger block: ${block.id} (Type: ${block.metadata?.id})`)
// If this trigger block was initialized with a precomputed output in the execution context
// (e.g., webhook payload injected at init), return it as-is to preserve the raw shape.
const existingState = context.blockStates.get(block.id)
if (existingState?.output && Object.keys(existingState.output).length > 0) {
return existingState.output
}
// For trigger blocks, return the starter block's output which contains the workflow input
// This ensures webhook data like message, sender, chat, etc. are accessible
const starterBlock = context.workflow?.blocks?.find((b) => b.metadata?.id === 'starter')
@@ -36,9 +43,10 @@ export class TriggerBlockHandler implements BlockHandler {
const starterOutput = starterState.output
// Generic handling for webhook triggers - extract provider-specific data
// Check if this is a webhook execution with nested structure
// Check if this is a webhook execution
if (starterOutput.webhook?.data) {
const webhookData = starterOutput.webhook.data
const webhookData = starterOutput.webhook?.data || {}
const provider = webhookData.provider
logger.debug(`Processing webhook trigger for block ${block.id}`, {
@@ -46,7 +54,21 @@ export class TriggerBlockHandler implements BlockHandler {
blockType: block.metadata?.id,
})
// Extract the flattened properties that should be at root level
// Provider-specific early return for GitHub: expose raw payload at root
if (provider === 'github') {
const payloadSource = webhookData.payload || {}
return {
...payloadSource,
webhook: starterOutput.webhook,
}
}
// Provider-specific early return for Airtable: preserve raw shape entirely
if (provider === 'airtable') {
return starterOutput
}
// Extract the flattened properties that should be at root level (non-GitHub/Airtable)
const result: any = {
// Always keep the input at root level
input: starterOutput.input,
@@ -67,70 +89,17 @@ export class TriggerBlockHandler implements BlockHandler {
const providerData = starterOutput[provider]
for (const [key, value] of Object.entries(providerData)) {
// Special handling for GitHub provider - copy all properties
if (provider === 'github') {
// For GitHub, copy all properties (objects and primitives) to root level
// For other providers, keep existing logic (only copy objects)
if (typeof value === 'object' && value !== null) {
// Don't overwrite existing top-level properties
if (!result[key]) {
// Special handling for complex objects that might have enumeration issues
if (typeof value === 'object' && value !== null) {
try {
// Deep clone complex objects to avoid reference issues
result[key] = JSON.parse(JSON.stringify(value))
} catch (error) {
// If JSON serialization fails, try direct assignment
result[key] = value
}
} else {
result[key] = value
}
}
} else {
// For other providers, keep existing logic (only copy objects)
if (typeof value === 'object' && value !== null) {
// Don't overwrite existing top-level properties
if (!result[key]) {
result[key] = value
}
result[key] = value
}
}
}
// Keep nested structure for backwards compatibility
result[provider] = providerData
// Special handling for GitHub complex objects that might not be copied by the main loop
if (provider === 'github') {
// Comprehensive GitHub object extraction from multiple possible sources
const githubObjects = ['repository', 'sender', 'pusher', 'commits', 'head_commit']
for (const objName of githubObjects) {
// ALWAYS try to get the object, even if something exists (fix for conflicts)
let objectValue = null
// Source 1: Direct from provider data
if (providerData[objName]) {
objectValue = providerData[objName]
}
// Source 2: From webhook payload (raw GitHub webhook)
else if (starterOutput.webhook?.data?.payload?.[objName]) {
objectValue = starterOutput.webhook.data.payload[objName]
}
// Source 3: For commits, try parsing JSON string version if no object found
else if (objName === 'commits' && typeof result.commits === 'string') {
try {
objectValue = JSON.parse(result.commits)
} catch (e) {
// Keep as string if parsing fails
objectValue = result.commits
}
}
// FORCE the object to root level (removed the !result[objName] condition)
if (objectValue !== null && objectValue !== undefined) {
result[objName] = objectValue
}
}
}
}
// Pattern 2: Provider data directly in webhook.data (based on actual structure)

View File

@@ -607,19 +607,9 @@ export function formatWebhookInput(
}
return {
input, // Primary workflow input
// Top-level properties for backward compatibility
...githubData,
// GitHub data structured for trigger handler to extract
github: {
// Processed convenience variables
...githubData,
// Raw GitHub webhook payload for direct field access
...body,
},
// Expose raw GitHub payload at the root
...body,
// Include webhook metadata alongside
webhook: {
data: {
provider: 'github',
@@ -835,6 +825,8 @@ export async function fetchAndProcessAirtablePayloads(
let apiCallCount = 0
// Use a Map to consolidate changes per record ID
const consolidatedChangesMap = new Map<string, AirtableChange>()
// Capture raw payloads from Airtable for exposure to workflows
const allPayloads = []
const localProviderConfig = {
...((webhookData.providerConfig as Record<string, any>) || {}),
} // Local copy
@@ -1031,6 +1023,10 @@ export async function fetchAndProcessAirtablePayloads(
// --- Process and Consolidate Changes ---
if (receivedPayloads.length > 0) {
payloadsFetched += receivedPayloads.length
// Keep the raw payloads for later exposure to the workflow
for (const p of receivedPayloads) {
allPayloads.push(p)
}
let changeCount = 0
for (const payload of receivedPayloads) {
if (payload.changedTablesById) {
@@ -1196,10 +1192,25 @@ export async function fetchAndProcessAirtablePayloads(
)
// --- Execute Workflow if we have changes (simplified - no lock check) ---
if (finalConsolidatedChanges.length > 0) {
if (finalConsolidatedChanges.length > 0 || allPayloads.length > 0) {
try {
// Format the input for the executor using the consolidated changes
const input = { airtableChanges: finalConsolidatedChanges } // Use the consolidated array
// Build input exposing raw payloads and consolidated changes
const latestPayload = allPayloads.length > 0 ? allPayloads[allPayloads.length - 1] : null
const input: any = {
// Raw Airtable payloads as received from the API
payloads: allPayloads,
latestPayload,
// Consolidated, simplified changes for convenience
airtableChanges: finalConsolidatedChanges,
// Include webhook metadata for resolver fallbacks
webhook: {
data: {
provider: 'airtable',
providerConfig: webhookData.providerConfig,
payload: latestPayload,
},
},
}
// CRITICAL EXECUTION TRACE POINT
logger.info(
@@ -1216,6 +1227,7 @@ export async function fetchAndProcessAirtablePayloads(
logger.info(`[${requestId}] CRITICAL_TRACE: Airtable changes processed, returning input`, {
workflowId: workflowData.id,
recordCount: finalConsolidatedChanges.length,
rawPayloadCount: allPayloads.length,
timestamp: new Date().toISOString(),
})

View File

@@ -38,37 +38,43 @@ export const airtableWebhookTrigger: TriggerConfig = {
},
outputs: {
event_type: {
type: 'string',
description: 'Type of Airtable event (e.g., record.created, record.updated, record.deleted)',
payloads: {
type: 'array',
description: 'The payloads of the Airtable changes',
},
base_id: {
type: 'string',
description: 'Airtable base identifier',
latestPayload: {
timestamp: {
type: 'string',
description: 'The timestamp of the Airtable change',
},
payloadFormat: {
type: 'object',
description: 'The format of the Airtable change',
},
actionMetadata: {
source: {
type: 'string',
description: 'The source of the Airtable change',
},
sourceMetadata: {
pageId: {
type: 'string',
description: 'The ID of the page that triggered the Airtable change',
},
},
changedTablesById: {
type: 'object',
description: 'The tables that were changed',
},
baseTransactionNumber: {
type: 'number',
description: 'The transaction number of the Airtable change',
},
},
},
table_id: {
type: 'string',
description: 'Airtable table identifier',
},
record_id: {
type: 'string',
description: 'Record identifier that was modified',
},
record_data: {
type: 'string',
description: 'Complete record data (when Include Full Record Data is enabled)',
},
changed_fields: {
type: 'string',
description: 'Fields that were changed in the record',
},
webhook_id: {
type: 'string',
description: 'Unique webhook identifier',
},
timestamp: {
type: 'string',
description: 'Event timestamp',
airtableChanges: {
type: 'array',
description: 'Changes made to the Airtable table',
},
},

View File

@@ -37,7 +37,7 @@ export const githubWebhookTrigger: TriggerConfig = {
},
outputs: {
// GitHub webhook payload structure - maps 1:1 to actual GitHub webhook body
// GitHub webhook payload structure - now at root for direct access
ref: {
type: 'string',
description: 'Git reference (e.g., refs/heads/fix/telegram-wh)',

View File

@@ -30,6 +30,10 @@ export const microsoftTeamsWebhookTrigger: TriggerConfig = {
type: 'string',
description: 'Unique message identifier',
},
input: {
type: 'string',
description: 'Input message',
},
timestamp: {
type: 'string',
description: 'Message timestamp',

View File

@@ -21,55 +21,73 @@ export const telegramWebhookTrigger: TriggerConfig = {
},
outputs: {
// Matches the formatted payload built in `formatWebhookInput` for provider "telegram"
// Supports tags like <telegram.message.text> and deep paths like <telegram.message.raw.chat.id>
message: {
update_id: {
id: {
type: 'number',
description: 'Unique identifier for the update',
},
message_id: {
type: 'number',
description: 'Unique message identifier',
},
from_id: {
type: 'number',
description: 'User ID who sent the message',
},
from_username: {
type: 'string',
description: 'Username of the sender',
},
from_first_name: {
type: 'string',
description: 'First name of the sender',
},
from_last_name: {
type: 'string',
description: 'Last name of the sender',
},
chat_id: {
type: 'number',
description: 'Unique identifier for the chat',
},
chat_type: {
type: 'string',
description: 'Type of chat (private, group, supergroup, channel)',
},
chat_title: {
type: 'string',
description: 'Title of the chat (for groups and channels)',
description: 'Telegram message ID',
},
text: {
type: 'string',
description: 'Message text content',
description: 'Message text content (if present)',
},
date: {
type: 'number',
description: 'Date the message was sent (Unix timestamp)',
},
entities: {
messageType: {
type: 'string',
description: 'Special entities in the message (mentions, hashtags, etc.) as JSON string',
description:
'Detected content type: text, photo, document, audio, video, voice, sticker, location, contact, poll',
},
raw: {
message_id: {
type: 'number',
description: 'Original Telegram message_id',
},
date: {
type: 'number',
description: 'Original Telegram message date (Unix timestamp)',
},
text: {
type: 'string',
description: 'Original Telegram text (if present)',
},
caption: {
type: 'string',
description: 'Original Telegram caption (if present)',
},
chat: {
id: { type: 'number', description: 'Chat identifier' },
username: { type: 'string', description: 'Chat username (if available)' },
first_name: { type: 'string', description: 'First name (for private chats)' },
last_name: { type: 'string', description: 'Last name (for private chats)' },
},
from: {
id: { type: 'number', description: 'Sender user ID' },
is_bot: { type: 'boolean', description: 'Whether the sender is a bot' },
first_name: { type: 'string', description: 'Sender first name' },
last_name: { type: 'string', description: 'Sender last name' },
language_code: { type: 'string', description: 'Sender language code (if available)' },
},
},
},
sender: {
id: { type: 'number', description: 'Sender user ID' },
firstName: { type: 'string', description: 'Sender first name' },
lastName: { type: 'string', description: 'Sender last name' },
languageCode: { type: 'string', description: 'Sender language code (if available)' },
isBot: { type: 'boolean', description: 'Whether the sender is a bot' },
},
updateId: {
type: 'number',
description: 'Update ID for this webhook delivery',
},
updateType: {
type: 'string',
description:
'Type of update: message, edited_message, channel_post, edited_channel_post, unknown',
},
},