feat(idempotency): added generalized idempotency service for all triggers/webhooks (#1330)

* update infra and remove railway

* feat(webhooks): add idempotency service for all triggers/webhooks

* Revert "update infra and remove railway"

This reverts commit abfa2f8d51.

* cleanup

* ack PR comments
This commit is contained in:
Waleed
2025-09-15 14:50:18 -07:00
committed by GitHub
parent f2ec43e4f9
commit d73a97ffa2
13 changed files with 7828 additions and 338 deletions

View File

@@ -0,0 +1,64 @@
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { cleanupExpiredIdempotencyKeys, getIdempotencyKeyStats } from '@/lib/idempotency/cleanup'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'
const logger = createLogger('IdempotencyCleanupAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 300 // Allow up to 5 minutes for cleanup
export async function GET(request: NextRequest) {
const requestId = generateRequestId()
logger.info(`Idempotency cleanup triggered (${requestId})`)
try {
const authError = verifyCronAuth(request, 'Idempotency key cleanup')
if (authError) {
return authError
}
const statsBefore = await getIdempotencyKeyStats()
logger.info(
`Pre-cleanup stats: ${statsBefore.totalKeys} keys across ${Object.keys(statsBefore.keysByNamespace).length} namespaces`
)
const result = await cleanupExpiredIdempotencyKeys({
maxAgeSeconds: 7 * 24 * 60 * 60, // 7 days
batchSize: 1000,
})
const statsAfter = await getIdempotencyKeyStats()
logger.info(`Post-cleanup stats: ${statsAfter.totalKeys} keys remaining`)
return NextResponse.json({
success: true,
message: 'Idempotency key cleanup completed',
requestId,
result: {
deleted: result.deleted,
errors: result.errors,
statsBefore: {
totalKeys: statsBefore.totalKeys,
keysByNamespace: statsBefore.keysByNamespace,
},
statsAfter: {
totalKeys: statsAfter.totalKeys,
keysByNamespace: statsAfter.keysByNamespace,
},
},
})
} catch (error) {
logger.error(`Error during idempotency cleanup (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'Idempotency cleanup failed',
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},
{ status: 500 }
)
}
}

View File

@@ -4,6 +4,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { env, isTruthy } from '@/lib/env'
import { IdempotencyService, webhookIdempotency } from '@/lib/idempotency/service'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'
import {
@@ -328,7 +329,7 @@ export async function POST(
// Continue processing - better to risk usage limit bypass than fail webhook
}
// --- PHASE 5: Queue webhook execution (trigger.dev or direct based on env) ---
// --- PHASE 5: Idempotent webhook execution ---
try {
const payload = {
webhookId: foundWebhook.id,
@@ -341,22 +342,44 @@ export async function POST(
blockId: foundWebhook.blockId,
}
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
foundWebhook.id,
body,
Object.fromEntries(request.headers.entries())
)
if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
} else {
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
}
const result = await webhookIdempotency.executeWithIdempotency(
foundWebhook.provider,
idempotencyKey,
async () => {
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
return {
method: 'trigger.dev',
taskId: handle.id,
status: 'queued',
}
}
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
return {
method: 'direct',
status: 'queued',
}
}
)
logger.debug(`[${requestId}] Webhook execution result:`, result)
// Return immediate acknowledgment with provider-specific format
if (foundWebhook.provider === 'microsoftteams') {

View File

@@ -3,6 +3,7 @@ import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { IdempotencyService, webhookIdempotency } from '@/lib/idempotency'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -41,11 +42,29 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
executionId,
})
// Initialize logging session outside try block so it's available in catch
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
payload.webhookId,
payload.body,
payload.headers
)
return await webhookIdempotency.executeWithIdempotency(
payload.provider,
idempotencyKey,
async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}
)
}
async function executeWebhookJobInternal(
payload: WebhookExecutionPayload,
executionId: string,
requestId: string
) {
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
try {
// Check usage limits first
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
@@ -62,7 +81,6 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
)
}
// Load workflow from normalized tables
const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId)
if (!workflowData) {
throw new Error(`Workflow not found: ${payload.workflowId}`)
@@ -70,7 +88,6 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
const { blocks, edges, loops, parallels } = workflowData
// Get environment variables with workspace precedence
const wfRows = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)

View File

@@ -0,0 +1,10 @@
CREATE TABLE "idempotency_key" (
"key" text NOT NULL,
"namespace" text DEFAULT 'default' NOT NULL,
"result" json NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE UNIQUE INDEX "idempotency_key_namespace_unique" ON "idempotency_key" USING btree ("key","namespace");--> statement-breakpoint
CREATE INDEX "idempotency_key_created_at_idx" ON "idempotency_key" USING btree ("created_at");--> statement-breakpoint
CREATE INDEX "idempotency_key_namespace_idx" ON "idempotency_key" USING btree ("namespace");

File diff suppressed because it is too large Load Diff

View File

@@ -624,6 +624,13 @@
"when": 1757628623657,
"tag": "0089_amused_pete_wisdom",
"breakpoints": true
},
{
"idx": 90,
"version": "7",
"when": 1757805452908,
"tag": "0090_fearless_zaladane",
"breakpoints": true
}
]
}

View File

@@ -1333,6 +1333,27 @@ export const workflowDeploymentVersion = pgTable(
})
)
// Idempotency keys for preventing duplicate processing across all webhooks and triggers
export const idempotencyKey = pgTable(
'idempotency_key',
{
key: text('key').notNull(),
namespace: text('namespace').notNull().default('default'),
result: json('result').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
// Primary key is combination of key and namespace
keyNamespacePk: uniqueIndex('idempotency_key_namespace_unique').on(table.key, table.namespace),
// Index for cleanup operations by creation time
createdAtIdx: index('idempotency_key_created_at_idx').on(table.createdAt),
// Index for namespace-based queries
namespaceIdx: index('idempotency_key_namespace_idx').on(table.namespace),
})
)
export const mcpServers = pgTable(
'mcp_servers',
{

View File

@@ -0,0 +1,175 @@
import { and, eq, lt } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { idempotencyKey } from '@/db/schema'
const logger = createLogger('IdempotencyCleanup')
export interface CleanupOptions {
/**
* Maximum age of idempotency keys in seconds before they're considered expired
* Default: 7 days (604800 seconds)
*/
maxAgeSeconds?: number
/**
* Maximum number of keys to delete in a single batch
* Default: 1000
*/
batchSize?: number
/**
* Specific namespace to clean up, or undefined to clean all namespaces
*/
namespace?: string
}
/**
* Clean up expired idempotency keys from the database
*/
export async function cleanupExpiredIdempotencyKeys(
options: CleanupOptions = {}
): Promise<{ deleted: number; errors: string[] }> {
const {
maxAgeSeconds = 7 * 24 * 60 * 60, // 7 days
batchSize = 1000,
namespace,
} = options
const errors: string[] = []
let totalDeleted = 0
try {
const cutoffDate = new Date(Date.now() - maxAgeSeconds * 1000)
logger.info('Starting idempotency key cleanup', {
cutoffDate: cutoffDate.toISOString(),
namespace: namespace || 'all',
batchSize,
})
let hasMore = true
let batchCount = 0
while (hasMore) {
try {
const whereCondition = namespace
? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace))
: lt(idempotencyKey.createdAt, cutoffDate)
// First, find IDs to delete with limit
const toDelete = await db
.select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace })
.from(idempotencyKey)
.where(whereCondition)
.limit(batchSize)
if (toDelete.length === 0) {
break
}
// Delete the found records
const deleteResult = await db
.delete(idempotencyKey)
.where(
and(
...toDelete.map((item) =>
and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace))
)
)
)
.returning({ key: idempotencyKey.key })
const deletedCount = deleteResult.length
totalDeleted += deletedCount
batchCount++
if (deletedCount === 0) {
hasMore = false
logger.info('No more expired idempotency keys found')
} else if (deletedCount < batchSize) {
hasMore = false
logger.info(`Deleted final batch of ${deletedCount} expired idempotency keys`)
} else {
logger.info(`Deleted batch ${batchCount}: ${deletedCount} expired idempotency keys`)
await new Promise((resolve) => setTimeout(resolve, 100))
}
} catch (batchError) {
const errorMessage =
batchError instanceof Error ? batchError.message : 'Unknown batch error'
logger.error(`Error deleting batch ${batchCount + 1}:`, batchError)
errors.push(`Batch ${batchCount + 1}: ${errorMessage}`)
batchCount++
if (errors.length > 5) {
logger.error('Too many batch errors, stopping cleanup')
break
}
}
}
logger.info('Idempotency key cleanup completed', {
totalDeleted,
batchCount,
errors: errors.length,
})
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error('Failed to cleanup expired idempotency keys:', error)
errors.push(`General error: ${errorMessage}`)
}
return { deleted: totalDeleted, errors }
}
/**
* Get statistics about idempotency key usage
*/
export async function getIdempotencyKeyStats(): Promise<{
totalKeys: number
keysByNamespace: Record<string, number>
oldestKey: Date | null
newestKey: Date | null
}> {
try {
const allKeys = await db
.select({
namespace: idempotencyKey.namespace,
createdAt: idempotencyKey.createdAt,
})
.from(idempotencyKey)
const totalKeys = allKeys.length
const keysByNamespace: Record<string, number> = {}
let oldestKey: Date | null = null
let newestKey: Date | null = null
for (const key of allKeys) {
keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1
if (!oldestKey || key.createdAt < oldestKey) {
oldestKey = key.createdAt
}
if (!newestKey || key.createdAt > newestKey) {
newestKey = key.createdAt
}
}
return {
totalKeys,
keysByNamespace,
oldestKey,
newestKey,
}
} catch (error) {
logger.error('Failed to get idempotency key stats:', error)
return {
totalKeys: 0,
keysByNamespace: {},
oldestKey: null,
newestKey: null,
}
}
}

View File

@@ -0,0 +1,7 @@
export * from './cleanup'
export * from './service'
export {
pollingIdempotency,
triggerIdempotency,
webhookIdempotency,
} from './service'

View File

@@ -0,0 +1,406 @@
import * as crypto from 'crypto'
import { and, eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { getRedisClient } from '@/lib/redis'
import { db } from '@/db'
import { idempotencyKey } from '@/db/schema'
const logger = createLogger('IdempotencyService')
export interface IdempotencyConfig {
/**
* Time-to-live for the idempotency key in seconds
* Default: 7 days (604800 seconds)
*/
ttlSeconds?: number
/**
* Namespace for the idempotency key (e.g., 'gmail', 'webhook', 'trigger')
* Default: 'default'
*/
namespace?: string
/**
* Enable database fallback when Redis is not available
* Default: true
*/
enableDatabaseFallback?: boolean
}
export interface IdempotencyResult {
/**
* Whether this is the first time processing this key
*/
isFirstTime: boolean
/**
* The normalized idempotency key used for storage
*/
normalizedKey: string
/**
* Previous result if this key was already processed
*/
previousResult?: any
/**
* Storage method used ('redis', 'database', 'memory')
*/
storageMethod: 'redis' | 'database' | 'memory'
}
export interface ProcessingResult {
success: boolean
result?: any
error?: string
}
const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days
const REDIS_KEY_PREFIX = 'idempotency:'
const MEMORY_CACHE_SIZE = 1000
const memoryCache = new Map<
string,
{
result: any
timestamp: number
ttl: number
}
>()
/**
* Universal idempotency service for webhooks, triggers, and any other operations
* that need duplicate prevention.
*/
export class IdempotencyService {
private config: Required<IdempotencyConfig>
constructor(config: IdempotencyConfig = {}) {
this.config = {
ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL,
namespace: config.namespace ?? 'default',
enableDatabaseFallback: config.enableDatabaseFallback ?? true,
}
}
/**
* Generate a normalized idempotency key from various sources
*/
private normalizeKey(
provider: string,
identifier: string,
additionalContext?: Record<string, any>
): string {
const base = `${this.config.namespace}:${provider}:${identifier}`
if (additionalContext && Object.keys(additionalContext).length > 0) {
// Sort keys for consistent hashing
const sortedKeys = Object.keys(additionalContext).sort()
const contextStr = sortedKeys.map((key) => `${key}=${additionalContext[key]}`).join('&')
return `${base}:${contextStr}`
}
return base
}
/**
* Check if an operation has already been processed
*/
async checkIdempotency(
provider: string,
identifier: string,
additionalContext?: Record<string, any>
): Promise<IdempotencyResult> {
const normalizedKey = this.normalizeKey(provider, identifier, additionalContext)
const redisKey = `${REDIS_KEY_PREFIX}${normalizedKey}`
try {
const redis = getRedisClient()
if (redis) {
const cachedResult = await redis.get(redisKey)
if (cachedResult) {
logger.debug(`Idempotency hit in Redis: ${normalizedKey}`)
return {
isFirstTime: false,
normalizedKey,
previousResult: JSON.parse(cachedResult),
storageMethod: 'redis',
}
}
logger.debug(`Idempotency miss in Redis: ${normalizedKey}`)
return {
isFirstTime: true,
normalizedKey,
storageMethod: 'redis',
}
}
} catch (error) {
logger.warn(`Redis idempotency check failed for ${normalizedKey}:`, error)
}
if (this.config.enableDatabaseFallback) {
try {
const existing = await db
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.limit(1)
if (existing.length > 0) {
const item = existing[0]
const isExpired = Date.now() - item.createdAt.getTime() > this.config.ttlSeconds * 1000
if (!isExpired) {
logger.debug(`Idempotency hit in database: ${normalizedKey}`)
return {
isFirstTime: false,
normalizedKey,
previousResult: item.result,
storageMethod: 'database',
}
}
await db
.delete(idempotencyKey)
.where(eq(idempotencyKey.key, normalizedKey))
.catch((err) => logger.warn(`Failed to clean up expired key ${normalizedKey}:`, err))
}
logger.debug(`Idempotency miss in database: ${normalizedKey}`)
return {
isFirstTime: true,
normalizedKey,
storageMethod: 'database',
}
} catch (error) {
logger.warn(`Database idempotency check failed for ${normalizedKey}:`, error)
}
}
const memoryEntry = memoryCache.get(normalizedKey)
if (memoryEntry) {
const isExpired = Date.now() - memoryEntry.timestamp > memoryEntry.ttl * 1000
if (!isExpired) {
logger.debug(`Idempotency hit in memory: ${normalizedKey}`)
return {
isFirstTime: false,
normalizedKey,
previousResult: memoryEntry.result,
storageMethod: 'memory',
}
}
memoryCache.delete(normalizedKey)
}
logger.debug(`Idempotency miss in memory: ${normalizedKey}`)
return {
isFirstTime: true,
normalizedKey,
storageMethod: 'memory',
}
}
/**
* Store the result of processing for future idempotency checks
*/
async storeResult(
normalizedKey: string,
result: ProcessingResult,
storageMethod: 'redis' | 'database' | 'memory'
): Promise<void> {
const serializedResult = JSON.stringify(result)
try {
if (storageMethod === 'redis') {
const redis = getRedisClient()
if (redis) {
await redis.setex(
`${REDIS_KEY_PREFIX}${normalizedKey}`,
this.config.ttlSeconds,
serializedResult
)
logger.debug(`Stored idempotency result in Redis: ${normalizedKey}`)
return
}
}
} catch (error) {
logger.warn(`Failed to store result in Redis for ${normalizedKey}:`, error)
}
if (this.config.enableDatabaseFallback && storageMethod !== 'memory') {
try {
await db
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: result,
createdAt: new Date(),
})
.onConflictDoUpdate({
target: [idempotencyKey.key, idempotencyKey.namespace],
set: {
result: result,
createdAt: new Date(),
},
})
logger.debug(`Stored idempotency result in database: ${normalizedKey}`)
return
} catch (error) {
logger.warn(`Failed to store result in database for ${normalizedKey}:`, error)
}
}
memoryCache.set(normalizedKey, {
result,
timestamp: Date.now(),
ttl: this.config.ttlSeconds,
})
if (memoryCache.size > MEMORY_CACHE_SIZE) {
const entries = Array.from(memoryCache.entries())
const now = Date.now()
entries.forEach(([key, entry]) => {
if (now - entry.timestamp > entry.ttl * 1000) {
memoryCache.delete(key)
}
})
if (memoryCache.size > MEMORY_CACHE_SIZE) {
const sortedEntries = entries
.filter(([key]) => memoryCache.has(key))
.sort((a, b) => a[1].timestamp - b[1].timestamp)
const toRemove = sortedEntries.slice(0, memoryCache.size - MEMORY_CACHE_SIZE)
toRemove.forEach(([key]) => memoryCache.delete(key))
}
}
logger.debug(`Stored idempotency result in memory: ${normalizedKey}`)
}
/**
* Execute an operation with idempotency protection
*/
async executeWithIdempotency<T>(
provider: string,
identifier: string,
operation: () => Promise<T>,
additionalContext?: Record<string, any>
): Promise<T> {
const idempotencyCheck = await this.checkIdempotency(provider, identifier, additionalContext)
if (!idempotencyCheck.isFirstTime) {
logger.info(`Skipping duplicate operation: ${idempotencyCheck.normalizedKey}`)
if (idempotencyCheck.previousResult?.success === false) {
throw new Error(idempotencyCheck.previousResult?.error || 'Previous operation failed')
}
return idempotencyCheck.previousResult?.result as T
}
try {
logger.debug(`Executing new operation: ${idempotencyCheck.normalizedKey}`)
const result = await operation()
await this.storeResult(
idempotencyCheck.normalizedKey,
{ success: true, result },
idempotencyCheck.storageMethod
)
return result
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
await this.storeResult(
idempotencyCheck.normalizedKey,
{ success: false, error: errorMessage },
idempotencyCheck.storageMethod
)
throw error
}
}
/**
* Create an idempotency key from a webhook payload
*/
static createWebhookIdempotencyKey(
webhookId: string,
payload: any,
headers?: Record<string, string>
): string {
const webhookIdHeader =
headers?.['x-webhook-id'] ||
headers?.['x-shopify-webhook-id'] ||
headers?.['x-github-delivery'] ||
headers?.['stripe-signature']?.split(',')[0]
if (webhookIdHeader) {
return `${webhookId}:${webhookIdHeader}`
}
const payloadId = payload?.id || payload?.event_id || payload?.message?.id || payload?.data?.id
if (payloadId) {
return `${webhookId}:${payloadId}`
}
const payloadHash = crypto
.createHash('sha256')
.update(JSON.stringify(payload))
.digest('hex')
.substring(0, 16)
return `${webhookId}:${payloadHash}`
}
/**
* Create an idempotency key for Gmail polling
*/
static createGmailIdempotencyKey(webhookId: string, emailId: string): string {
return `${webhookId}:${emailId}`
}
/**
* Create an idempotency key for generic triggers
*/
static createTriggerIdempotencyKey(
triggerId: string,
eventId: string,
additionalContext?: Record<string, string>
): string {
const base = `${triggerId}:${eventId}`
if (additionalContext && Object.keys(additionalContext).length > 0) {
const contextStr = Object.keys(additionalContext)
.sort()
.map((key) => `${key}=${additionalContext[key]}`)
.join('&')
return `${base}:${contextStr}`
}
return base
}
}
export const webhookIdempotency = new IdempotencyService({
namespace: 'webhook',
ttlSeconds: 60 * 60 * 24 * 7, // 7 days
})
export const pollingIdempotency = new IdempotencyService({
namespace: 'polling',
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
})
export const triggerIdempotency = new IdempotencyService({
namespace: 'trigger',
ttlSeconds: 60 * 60 * 24 * 1, // 1 day
})

View File

@@ -1,7 +1,7 @@
import { and, eq } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { pollingIdempotency } from '@/lib/idempotency/service'
import { createLogger } from '@/lib/logs/console/logger'
import { hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
import { getBaseUrl } from '@/lib/urls/utils'
import { getOAuthToken, refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { db } from '@/db'
@@ -16,7 +16,6 @@ interface GmailWebhookConfig {
maxEmailsPerPoll?: number
lastCheckedTimestamp?: string
historyId?: string
processedEmailIds?: string[]
pollingInterval?: number
includeRawEmail?: boolean
}
@@ -138,30 +137,10 @@ export async function pollGmailWebhooks() {
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
// Get processed email IDs (to avoid duplicates)
const processedEmailIds = config.processedEmailIds || []
// Filter out emails that have already been processed
const newEmails = emails.filter((email) => !processedEmailIds.includes(email.id))
if (newEmails.length === 0) {
logger.info(
`[${requestId}] All emails have already been processed for webhook ${webhookId}`
)
await updateWebhookLastChecked(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId
)
return { success: true, webhookId, status: 'already_processed' }
}
logger.info(
`[${requestId}] Processing ${newEmails.length} new emails for webhook ${webhookId}`
)
logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`)
// Process all emails (process each email as a separate workflow trigger)
const emailsToProcess = newEmails
const emailsToProcess = emails
// Process emails
const processed = await processEmails(
@@ -172,24 +151,13 @@ export async function pollGmailWebhooks() {
requestId
)
// Record which email IDs have been processed
const newProcessedIds = [...processedEmailIds, ...emailsToProcess.map((email) => email.id)]
// Keep only the most recent 100 IDs to prevent the list from growing too large
const trimmedProcessedIds = newProcessedIds.slice(-100)
// Update webhook with latest history ID, timestamp, and processed email IDs
await updateWebhookData(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId,
trimmedProcessedIds
)
// Update webhook with latest history ID and timestamp
await updateWebhookData(webhookId, now.toISOString(), latestHistoryId || config.historyId)
return {
success: true,
webhookId,
emailsFound: emails.length,
newEmails: newEmails.length,
emailsProcessed: processed,
}
} catch (error) {
@@ -508,156 +476,157 @@ async function processEmails(
for (const email of emails) {
try {
// Deduplicate at Redis level (guards against races between cron runs)
const dedupeKey = `gmail:${webhookData.id}:${email.id}`
try {
const alreadyProcessed = await hasProcessedMessage(dedupeKey)
if (alreadyProcessed) {
logger.info(
`[${requestId}] Duplicate email ${email.id} for webhook ${webhookData.id} skipping`
const result = await pollingIdempotency.executeWithIdempotency(
'gmail',
`${webhookData.id}:${email.id}`,
async () => {
// Extract useful information from email to create a simplified payload
// First, extract headers into a map for easy access
const headers: Record<string, string> = {}
if (email.payload?.headers) {
for (const header of email.payload.headers) {
headers[header.name.toLowerCase()] = header.value
}
}
// Extract and decode email body content
let textContent = ''
let htmlContent = ''
// Function to extract content from parts recursively
const extractContent = (part: any) => {
if (!part) return
// Extract current part content if it exists
if (part.mimeType === 'text/plain' && part.body?.data) {
textContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
} else if (part.mimeType === 'text/html' && part.body?.data) {
htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
}
// Process nested parts
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
extractContent(subPart)
}
}
}
// Extract content from the email payload
if (email.payload) {
extractContent(email.payload)
}
// Parse date into standard format
let date: string | null = null
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (_e) {
// Keep date as null if parsing fails
}
} else if (email.internalDate) {
// Use internalDate as fallback (convert from timestamp to ISO string)
date = new Date(Number.parseInt(email.internalDate)).toISOString()
}
// Extract attachment information if present
const attachments: Array<{ filename: string; mimeType: string; size: number }> = []
const findAttachments = (part: any) => {
if (!part) return
if (part.filename && part.filename.length > 0) {
attachments.push({
filename: part.filename,
mimeType: part.mimeType || 'application/octet-stream',
size: part.body?.size || 0,
})
}
// Look for attachments in nested parts
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
findAttachments(subPart)
}
}
}
if (email.payload) {
findAttachments(email.payload)
}
// Create simplified email object
const simplifiedEmail: SimplifiedEmail = {
id: email.id,
threadId: email.threadId,
subject: headers.subject || '[No Subject]',
from: headers.from || '',
to: headers.to || '',
cc: headers.cc || '',
date: date,
bodyText: textContent,
bodyHtml: htmlContent,
labels: email.labelIds || [],
hasAttachments: attachments.length > 0,
attachments: attachments,
}
// Prepare webhook payload with simplified email and optionally raw email
const payload: GmailWebhookPayload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
...(config.includeRawEmail ? { rawEmail: email } : {}),
}
logger.debug(
`[${requestId}] Sending ${config.includeRawEmail ? 'simplified + raw' : 'simplified'} email payload for ${email.id}`
)
continue
}
} catch (err) {
logger.warn(`[${requestId}] Redis check failed for ${email.id}, continuing`, err)
}
// Extract useful information from email to create a simplified payload
// First, extract headers into a map for easy access
const headers: Record<string, string> = {}
if (email.payload?.headers) {
for (const header of email.payload.headers) {
headers[header.name.toLowerCase()] = header.value
}
}
// Trigger the webhook
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
// Extract and decode email body content
let textContent = ''
let htmlContent = ''
// Function to extract content from parts recursively
const extractContent = (part: any) => {
if (!part) return
// Extract current part content if it exists
if (part.mimeType === 'text/plain' && part.body?.data) {
textContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
} else if (part.mimeType === 'text/html' && part.body?.data) {
htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
}
// Process nested parts
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
extractContent(subPart)
}
}
}
// Extract content from the email payload
if (email.payload) {
extractContent(email.payload)
}
// Parse date into standard format
let date: string | null = null
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (_e) {
// Keep date as null if parsing fails
}
} else if (email.internalDate) {
// Use internalDate as fallback (convert from timestamp to ISO string)
date = new Date(Number.parseInt(email.internalDate)).toISOString()
}
// Extract attachment information if present
const attachments: Array<{ filename: string; mimeType: string; size: number }> = []
const findAttachments = (part: any) => {
if (!part) return
if (part.filename && part.filename.length > 0) {
attachments.push({
filename: part.filename,
mimeType: part.mimeType || 'application/octet-stream',
size: part.body?.size || 0,
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'SimStudio/1.0',
},
body: JSON.stringify(payload),
})
}
// Look for attachments in nested parts
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
findAttachments(subPart)
if (!response.ok) {
const errorText = await response.text()
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
errorText
)
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
}
// Mark email as read if configured
if (config.markAsRead) {
await markEmailAsRead(accessToken, email.id)
}
return {
emailId: email.id,
webhookStatus: response.status,
processed: true,
}
}
}
if (email.payload) {
findAttachments(email.payload)
}
// Create simplified email object
const simplifiedEmail: SimplifiedEmail = {
id: email.id,
threadId: email.threadId,
subject: headers.subject || '[No Subject]',
from: headers.from || '',
to: headers.to || '',
cc: headers.cc || '',
date: date,
bodyText: textContent,
bodyHtml: htmlContent,
labels: email.labelIds || [],
hasAttachments: attachments.length > 0,
attachments: attachments,
}
// Prepare webhook payload with simplified email and optionally raw email
const payload: GmailWebhookPayload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
...(config.includeRawEmail ? { rawEmail: email } : {}),
}
logger.debug(
`[${requestId}] Sending ${config.includeRawEmail ? 'simplified + raw' : 'simplified'} email payload for ${email.id}`
)
// Trigger the webhook
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'SimStudio/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
await response.text()
)
continue
}
// Mark email as read if configured
if (config.markAsRead) {
await markEmailAsRead(accessToken, email.id)
}
logger.info(
`[${requestId}] Successfully processed email ${email.id} for webhook ${webhookData.id}`
)
processedCount++
await markMessageAsProcessed(dedupeKey)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage)
// Continue processing other emails even if one fails
}
}
@@ -706,12 +675,7 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string, hi
.where(eq(webhook.id, webhookId))
}
async function updateWebhookData(
webhookId: string,
timestamp: string,
historyId?: string,
processedEmailIds?: string[]
) {
async function updateWebhookData(webhookId: string, timestamp: string, historyId?: string) {
const existingConfig =
(await db.select().from(webhook).where(eq(webhook.id, webhookId)))[0]?.providerConfig || {}
@@ -722,7 +686,6 @@ async function updateWebhookData(
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
...(processedEmailIds ? { processedEmailIds } : {}),
},
updatedAt: new Date(),
})

View File

@@ -1,8 +1,8 @@
import { and, eq } from 'drizzle-orm'
import { htmlToText } from 'html-to-text'
import { nanoid } from 'nanoid'
import { pollingIdempotency } from '@/lib/idempotency'
import { createLogger } from '@/lib/logs/console/logger'
import { hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
import { getBaseUrl } from '@/lib/urls/utils'
import { getOAuthToken, refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { db } from '@/db'
@@ -17,7 +17,6 @@ interface OutlookWebhookConfig {
markAsRead?: boolean
maxEmailsPerPoll?: number
lastCheckedTimestamp?: string
processedEmailIds?: string[]
pollingInterval?: number
includeRawEmail?: boolean
}
@@ -179,42 +178,24 @@ export async function pollOutlookWebhooks() {
logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`)
// Filter out already processed emails
const processedEmailIds = config.processedEmailIds || []
const newEmails = emails.filter((email) => !processedEmailIds.includes(email.id))
if (!newEmails.length) {
logger.info(`[${requestId}] All emails already processed for webhook ${webhookId}`)
await updateWebhookLastChecked(webhookId, now.toISOString())
return { success: true, webhookId, status: 'all_processed' }
}
logger.info(
`[${requestId}] Processing ${newEmails.length} new emails for webhook ${webhookId}`
)
logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`)
// Process emails
const processed = await processOutlookEmails(
newEmails,
emails,
webhookData,
config,
accessToken,
requestId
)
// Record which email IDs have been processed
const newProcessedIds = [...processedEmailIds, ...newEmails.map((email) => email.id)]
// Keep only the most recent 100 IDs to prevent the list from growing too large
const trimmedProcessedIds = newProcessedIds.slice(-100)
// Update webhook with latest timestamp and processed email IDs
await updateWebhookData(webhookId, now.toISOString(), trimmedProcessedIds)
// Update webhook with latest timestamp
await updateWebhookLastChecked(webhookId, now.toISOString())
return {
success: true,
webhookId,
emailsFound: emails.length,
newEmails: newEmails.length,
emailsProcessed: processed,
}
} catch (error) {
@@ -358,91 +339,94 @@ async function processOutlookEmails(
for (const email of emails) {
try {
// Check if we've already processed this email (Redis-based deduplication)
const redisKey = `outlook-email-${email.id}`
const alreadyProcessed = await hasProcessedMessage(redisKey)
if (alreadyProcessed) {
logger.debug(`[${requestId}] Email ${email.id} already processed, skipping`)
continue
}
// Convert to simplified format
const simplifiedEmail: SimplifiedOutlookEmail = {
id: email.id,
conversationId: email.conversationId,
subject: email.subject || '(No Subject)',
from: email.from?.emailAddress?.address || '',
to: email.toRecipients?.map((r) => r.emailAddress.address).join(', ') || '',
cc: email.ccRecipients?.map((r) => r.emailAddress.address).join(', ') || '',
date: email.receivedDateTime,
bodyText: (() => {
const content = email.body?.content || ''
const type = (email.body?.contentType || '').toLowerCase()
if (!content) {
return email.bodyPreview || ''
const result = await pollingIdempotency.executeWithIdempotency(
'outlook',
`${webhookData.id}:${email.id}`,
async () => {
// Convert to simplified format
const simplifiedEmail: SimplifiedOutlookEmail = {
id: email.id,
conversationId: email.conversationId,
subject: email.subject || '(No Subject)',
from: email.from?.emailAddress?.address || '',
to: email.toRecipients?.map((r) => r.emailAddress.address).join(', ') || '',
cc: email.ccRecipients?.map((r) => r.emailAddress.address).join(', ') || '',
date: email.receivedDateTime,
bodyText: (() => {
const content = email.body?.content || ''
const type = (email.body?.contentType || '').toLowerCase()
if (!content) {
return email.bodyPreview || ''
}
if (type === 'text' || type === 'text/plain') {
return content
}
return convertHtmlToPlainText(content)
})(),
bodyHtml: email.body?.content || '',
hasAttachments: email.hasAttachments,
isRead: email.isRead,
folderId: email.parentFolderId,
// Thread support fields
messageId: email.id,
threadId: email.conversationId,
}
if (type === 'text' || type === 'text/plain') {
return content
// Create webhook payload
const payload: OutlookWebhookPayload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
}
return convertHtmlToPlainText(content)
})(),
bodyHtml: email.body?.content || '',
hasAttachments: email.hasAttachments,
isRead: email.isRead,
folderId: email.parentFolderId,
// Thread support fields
messageId: email.id,
threadId: email.conversationId,
}
// Create webhook payload
const payload: OutlookWebhookPayload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
}
// Include raw email if configured
if (config.includeRawEmail) {
payload.rawEmail = email
}
// Include raw email if configured
if (config.includeRawEmail) {
payload.rawEmail = email
}
logger.info(
`[${requestId}] Processing email: ${email.subject} from ${email.from?.emailAddress?.address}`
)
logger.info(
`[${requestId}] Processing email: ${email.subject} from ${email.from?.emailAddress?.address}`
// Trigger the webhook
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'SimStudio/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
const errorText = await response.text()
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
errorText
)
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
}
// Mark email as read if configured
if (config.markAsRead) {
await markOutlookEmailAsRead(accessToken, email.id)
}
return {
emailId: email.id,
webhookStatus: response.status,
processed: true,
}
}
)
// Trigger the webhook
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
'User-Agent': 'SimStudio/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
await response.text()
)
continue
}
// Mark email as read if configured
if (config.markAsRead) {
await markOutlookEmailAsRead(accessToken, email.id)
}
// Mark as processed in Redis (expires after 7 days)
await markMessageAsProcessed(redisKey, 7 * 24 * 60 * 60)
logger.info(
`[${requestId}] Successfully processed email ${email.id} for webhook ${webhookData.id}`
)
processedCount++
logger.info(`[${requestId}] Successfully processed email ${email.id}`)
} catch (error) {
logger.error(`[${requestId}] Error processing email ${email.id}:`, error)
}
@@ -507,39 +491,3 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string) {
logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error)
}
}
async function updateWebhookData(
webhookId: string,
timestamp: string,
processedEmailIds: string[]
) {
try {
const currentWebhook = await db
.select({ providerConfig: webhook.providerConfig })
.from(webhook)
.where(eq(webhook.id, webhookId))
.limit(1)
if (!currentWebhook.length) {
logger.error(`Webhook ${webhookId} not found`)
return
}
const currentConfig = (currentWebhook[0].providerConfig as any) || {}
const updatedConfig = {
...currentConfig, // Preserve ALL existing config including userId
lastCheckedTimestamp: timestamp,
processedEmailIds,
}
await db
.update(webhook)
.set({
providerConfig: updatedConfig,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (error) {
logger.error(`Error updating webhook ${webhookId} data:`, error)
}
}

View File

@@ -15,6 +15,10 @@
{
"path": "/api/logs/cleanup",
"schedule": "0 0 * * *"
},
{
"path": "/api/webhooks/cleanup/idempotency",
"schedule": "0 2 * * *"
}
]
}