refactor(polling): consolidate polling services into provider handler pattern (#4035)

* refactor(polling): consolidate polling services into provider handler pattern

Eliminate self-POST anti-pattern and extract shared boilerplate from 4 polling
services into a clean handler registry mirroring lib/webhooks/providers/.

- Add processPolledWebhookEvent() to processor.ts for direct in-process webhook
  execution, removing HTTP round-trips that caused Lambda 403/timeout errors
- Extract shared utilities (markWebhookFailed/Success, fetchActiveWebhooks,
  runWithConcurrency, resolveOAuthCredential, updateWebhookProviderConfig)
- Create PollingProviderHandler interface with per-provider implementations
- Consolidate 4 identical route files into single dynamic [provider] route
- Standardize concurrency to 10 across all providers
- No infra changes needed — Helm cron paths resolve via dynamic route

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* polish(polling): extract lock TTL constant and remove unnecessary type casts

- Widen processPolledWebhookEvent body param to accept object, eliminating
  `as unknown as Record<string, unknown>` double casts in all 4 handlers
- Extract LOCK_TTL_SECONDS constant in route, tying maxDuration and lock TTL
  to a single value

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(polling): address PR review feedback

- Add archivedAt filters to fetchActiveWebhooks query, matching
  findWebhookAndWorkflow in processor.ts to prevent polling archived
  webhooks/workflows
- Move provider validation after auth check to prevent provider
  enumeration by unauthenticated callers
- Fix inconsistent pollingIdempotency import path in outlook.ts to
  match other handlers

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(polling): use literal for maxDuration segment config

Next.js requires segment config exports to be statically analyzable
literals. Using a variable reference caused build failure.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Waleed
2026-04-07 20:55:20 -07:00
committed by GitHub
parent 2760b4bff1
commit 086b7d9ca1
16 changed files with 1716 additions and 2112 deletions

View File

@@ -3,31 +3,36 @@ import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import { generateShortId } from '@/lib/core/utils/uuid'
import { pollRssWebhooks } from '@/lib/webhooks/rss-polling-service'
import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling'
const logger = createLogger('RssPollingAPI')
const logger = createLogger('PollingAPI')
/** Lock TTL in seconds — must match maxDuration so the lock auto-expires if the function times out. */
const LOCK_TTL_SECONDS = 180
export const dynamic = 'force-dynamic'
export const maxDuration = 180 // Allow up to 3 minutes for polling to complete
export const maxDuration = 180
const LOCK_KEY = 'rss-polling-lock'
const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min)
export async function GET(request: NextRequest) {
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ provider: string }> }
) {
const { provider } = await params
const requestId = generateShortId()
logger.info(`RSS webhook polling triggered (${requestId})`)
const LOCK_KEY = `${provider}-polling-lock`
let lockValue: string | undefined
try {
const authError = verifyCronAuth(request, 'RSS webhook polling')
if (authError) {
return authError
const authError = verifyCronAuth(request, `${provider} webhook polling`)
if (authError) return authError
if (!VALID_POLLING_PROVIDERS.has(provider)) {
return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 })
}
lockValue = requestId
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
@@ -40,21 +45,21 @@ export async function GET(request: NextRequest) {
)
}
const results = await pollRssWebhooks()
const results = await pollProvider(provider)
return NextResponse.json({
success: true,
message: 'RSS polling completed',
message: `${provider} polling completed`,
requestId,
status: 'completed',
...results,
})
} catch (error) {
logger.error(`Error during RSS polling (${requestId}):`, error)
logger.error(`Error during ${provider} polling (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'RSS polling failed',
message: `${provider} polling failed`,
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},

View File

@@ -1,68 +0,0 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import { generateShortId } from '@/lib/core/utils/uuid'
import { pollGmailWebhooks } from '@/lib/webhooks/gmail-polling-service'
const logger = createLogger('GmailPollingAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 180 // Allow up to 3 minutes for polling to complete
const LOCK_KEY = 'gmail-polling-lock'
const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min)
export async function GET(request: NextRequest) {
const requestId = generateShortId()
logger.info(`Gmail webhook polling triggered (${requestId})`)
let lockValue: string | undefined
try {
const authError = verifyCronAuth(request, 'Gmail webhook polling')
if (authError) {
return authError
}
lockValue = requestId // unique value to identify the holder
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}
const results = await pollGmailWebhooks()
return NextResponse.json({
success: true,
message: 'Gmail polling completed',
requestId,
status: 'completed',
...results,
})
} catch (error) {
logger.error(`Error during Gmail polling (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'Gmail polling failed',
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},
{ status: 500 }
)
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
}

View File

@@ -1,68 +0,0 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import { generateShortId } from '@/lib/core/utils/uuid'
import { pollImapWebhooks } from '@/lib/webhooks/imap-polling-service'
const logger = createLogger('ImapPollingAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 180 // Allow up to 3 minutes for polling to complete
const LOCK_KEY = 'imap-polling-lock'
const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min)
export async function GET(request: NextRequest) {
const requestId = generateShortId()
logger.info(`IMAP webhook polling triggered (${requestId})`)
let lockValue: string | undefined
try {
const authError = verifyCronAuth(request, 'IMAP webhook polling')
if (authError) {
return authError
}
lockValue = requestId // unique value to identify the holder
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}
const results = await pollImapWebhooks()
return NextResponse.json({
success: true,
message: 'IMAP polling completed',
requestId,
status: 'completed',
...results,
})
} catch (error) {
logger.error(`Error during IMAP polling (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'IMAP polling failed',
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},
{ status: 500 }
)
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
}

View File

@@ -1,68 +0,0 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import { generateShortId } from '@/lib/core/utils/uuid'
import { pollOutlookWebhooks } from '@/lib/webhooks/outlook-polling-service'
const logger = createLogger('OutlookPollingAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 180 // Allow up to 3 minutes for polling to complete
const LOCK_KEY = 'outlook-polling-lock'
const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min)
export async function GET(request: NextRequest) {
const requestId = generateShortId()
logger.info(`Outlook webhook polling triggered (${requestId})`)
let lockValue: string | undefined
try {
const authError = verifyCronAuth(request, 'Outlook webhook polling')
if (authError) {
return authError
}
lockValue = requestId // unique value to identify the holder
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}
const results = await pollOutlookWebhooks()
return NextResponse.json({
success: true,
message: 'Outlook polling completed',
requestId,
status: 'completed',
...results,
})
} catch (error) {
logger.error(`Error during Outlook polling (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'Outlook polling failed',
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},
{ status: 500 }
)
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
}

View File

@@ -1,791 +0,0 @@
import { db } from '@sim/db'
import {
account,
credentialSet,
webhook,
workflow,
workflowDeploymentVersion,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { generateShortId } from '@/lib/core/utils/uuid'
import {
getOAuthToken,
refreshAccessTokenIfNeeded,
resolveOAuthAccountId,
} from '@/app/api/auth/oauth/utils'
import type { GmailAttachment } from '@/tools/gmail/types'
import { downloadAttachments, extractAttachmentInfo } from '@/tools/gmail/utils'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('GmailPollingService')
interface GmailWebhookConfig {
labelIds: string[]
labelFilterBehavior: 'INCLUDE' | 'EXCLUDE'
markAsRead: boolean
searchQuery?: string
maxEmailsPerPoll?: number
lastCheckedTimestamp?: string
historyId?: string
includeAttachments?: boolean
includeRawEmail?: boolean
}
interface GmailEmail {
id: string
threadId: string
historyId?: string
labelIds?: string[]
payload?: any
snippet?: string
internalDate?: string
}
export interface SimplifiedEmail {
id: string
threadId: string
subject: string
from: string
to: string
cc: string
date: string | null
bodyText: string
bodyHtml: string
labels: string[]
hasAttachments: boolean
attachments: GmailAttachment[]
}
export interface GmailWebhookPayload {
email: SimplifiedEmail
timestamp: string
rawEmail?: GmailEmail // Only included when includeRawEmail is true
}
async function markWebhookFailed(webhookId: string) {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
const newFailedCount = result[0]?.failedCount || 0
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
async function markWebhookSuccess(webhookId: string) {
try {
await db
.update(webhook)
.set({
failedCount: 0, // Reset on success
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
export async function pollGmailWebhooks() {
logger.info('Starting Gmail webhook polling')
try {
const activeWebhooksResult = await db
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'gmail'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
if (!activeWebhooks.length) {
logger.info('No active Gmail webhooks found')
return { total: 0, successful: 0, failed: 0, details: [] }
}
logger.info(`Found ${activeWebhooks.length} active Gmail webhooks`)
// Limit the number of webhooks processed in parallel to avoid
// exhausting Postgres or Gmail API connections when many users exist.
const CONCURRENCY = 10
const running: Promise<void>[] = []
let successCount = 0
let failureCount = 0
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
const webhookId = webhookData.id
const requestId = generateShortId()
try {
const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credential info for webhook ${webhookId}`)
await markWebhookFailed(webhookId)
failureCount++
return
}
if (credentialSetId) {
const [cs] = await db
.select({ organizationId: credentialSet.organizationId })
.from(credentialSet)
.where(eq(credentialSet.id, credentialSetId))
.limit(1)
if (cs?.organizationId) {
const hasAccess = await isOrganizationOnTeamOrEnterprisePlan(cs.organizationId)
if (!hasAccess) {
logger.error(
`[${requestId}] Polling Group plan restriction: Your current plan does not support Polling Groups. Upgrade to Team or Enterprise to use this feature.`,
{
webhookId,
credentialSetId,
organizationId: cs.organizationId,
}
)
await markWebhookFailed(webhookId)
failureCount++
return
}
}
}
let accessToken: string | null = null
if (credentialId) {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
logger.error(
`[${requestId}] Failed to resolve OAuth account for credential ${credentialId}, webhook ${webhookId}`
)
await markWebhookFailed(webhookId)
failureCount++
return
}
const rows = await db
.select()
.from(account)
.where(eq(account.id, resolved.accountId))
.limit(1)
if (rows.length === 0) {
logger.error(
`[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}`
)
await markWebhookFailed(webhookId)
failureCount++
return
}
const ownerUserId = rows[0].userId
accessToken = await refreshAccessTokenIfNeeded(resolved.accountId, ownerUserId, requestId)
} else if (userId) {
// Legacy fallback for webhooks without credentialId
accessToken = await getOAuthToken(userId, 'google-email')
}
if (!accessToken) {
logger.error(`[${requestId}] Failed to get Gmail access token for webhook ${webhookId}`)
await markWebhookFailed(webhookId)
failureCount++
return
}
const config = webhookData.providerConfig as unknown as GmailWebhookConfig
const now = new Date()
const fetchResult = await fetchNewEmails(accessToken, config, requestId)
const { emails, latestHistoryId } = fetchResult
if (!emails || !emails.length) {
await updateWebhookLastChecked(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId
)
await markWebhookSuccess(webhookId)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
successCount++
return
}
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`)
const emailsToProcess = emails
const { processedCount, failedCount } = await processEmails(
emailsToProcess,
webhookData,
config,
accessToken,
requestId
)
await updateWebhookLastChecked(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId
)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId)
failureCount++
logger.warn(
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
)
} else {
await markWebhookSuccess(webhookId)
successCount++
logger.info(
`[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
}
} catch (error) {
logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId)
failureCount++
}
}
for (const webhookData of activeWebhooks) {
const promise: Promise<void> = enqueue(webhookData)
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
.finally(() => {
const idx = running.indexOf(promise)
if (idx !== -1) running.splice(idx, 1)
})
running.push(promise)
if (running.length >= CONCURRENCY) {
await Promise.race(running)
}
}
await Promise.allSettled(running)
const summary = {
total: activeWebhooks.length,
successful: successCount,
failed: failureCount,
details: [],
}
logger.info('Gmail polling completed', {
total: summary.total,
successful: summary.successful,
failed: summary.failed,
})
return summary
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error('Error in Gmail polling service:', errorMessage)
throw error
}
}
async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) {
try {
const useHistoryApi = !!config.historyId
let emails = []
let latestHistoryId = config.historyId
if (useHistoryApi) {
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
const historyResponse = await fetch(historyUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!historyResponse.ok) {
const errorData = await historyResponse.json()
logger.error(`[${requestId}] Gmail history API error:`, {
status: historyResponse.status,
statusText: historyResponse.statusText,
error: errorData,
})
logger.info(`[${requestId}] Falling back to search API after history API failure`)
const searchResult = await searchEmails(accessToken, config, requestId)
return {
emails: searchResult.emails,
latestHistoryId: searchResult.latestHistoryId,
}
}
const historyData = await historyResponse.json()
if (!historyData.history || !historyData.history.length) {
return { emails: [], latestHistoryId }
}
if (historyData.historyId) {
latestHistoryId = historyData.historyId
}
const messageIds = new Set<string>()
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
}
}
}
if (messageIds.size === 0) {
return { emails: [], latestHistoryId }
}
const sortedIds = [...messageIds].sort().reverse()
const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25)
logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`)
const emailPromises = idsToFetch.map(async (messageId) => {
return getEmailDetails(accessToken, messageId)
})
const emailResults = await Promise.allSettled(emailPromises)
const rejected = emailResults.filter((r) => r.status === 'rejected')
if (rejected.length > 0) {
logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`)
}
emails = emailResults
.filter(
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
)
.map((result) => result.value)
emails = filterEmailsByLabels(emails, config)
} else {
const searchResult = await searchEmails(accessToken, config, requestId)
return searchResult
}
return { emails, latestHistoryId }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error fetching new emails:`, errorMessage)
throw error
}
}
/**
* Builds a Gmail search query from label and search configuration
*/
function buildGmailSearchQuery(config: {
labelIds?: string[]
labelFilterBehavior?: 'INCLUDE' | 'EXCLUDE'
searchQuery?: string
}): string {
let labelQuery = ''
if (config.labelIds && config.labelIds.length > 0) {
const labelParts = config.labelIds.map((label) => `label:${label}`).join(' OR ')
labelQuery =
config.labelFilterBehavior === 'INCLUDE'
? config.labelIds.length > 1
? `(${labelParts})`
: labelParts
: config.labelIds.length > 1
? `-(${labelParts})`
: `-${labelParts}`
}
let searchQueryPart = ''
if (config.searchQuery?.trim()) {
searchQueryPart = config.searchQuery.trim()
if (searchQueryPart.includes(' OR ') || searchQueryPart.includes(' AND ')) {
searchQueryPart = `(${searchQueryPart})`
}
}
let baseQuery = ''
if (labelQuery && searchQueryPart) {
baseQuery = `${labelQuery} ${searchQueryPart}`
} else if (searchQueryPart) {
baseQuery = searchQueryPart
} else if (labelQuery) {
baseQuery = labelQuery
} else {
baseQuery = 'in:inbox'
}
return baseQuery
}
async function searchEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) {
try {
const baseQuery = buildGmailSearchQuery(config)
let timeConstraint = ''
if (config.lastCheckedTimestamp) {
const lastCheckedTime = new Date(config.lastCheckedTimestamp)
const now = new Date()
const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000)
if (minutesSinceLastCheck < 60) {
const bufferSeconds = Math.max(1 * 60 * 2, 180)
const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000)
const timestamp = Math.floor(cutoffTime.getTime() / 1000)
timeConstraint = ` after:${timestamp}`
} else if (minutesSinceLastCheck < 24 * 60) {
const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 // Round up and add 1 hour buffer
timeConstraint = ` newer_than:${hours}h`
} else {
const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1
timeConstraint = ` newer_than:${days}d`
}
} else {
timeConstraint = ' newer_than:1d'
}
const query = `${baseQuery}${timeConstraint}`
const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}`
const searchResponse = await fetch(searchUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!searchResponse.ok) {
const errorData = await searchResponse.json()
logger.error(`[${requestId}] Gmail search API error:`, {
status: searchResponse.status,
statusText: searchResponse.statusText,
query: query,
error: errorData,
})
throw new Error(
`Gmail API error: ${searchResponse.status} ${searchResponse.statusText} - ${JSON.stringify(errorData)}`
)
}
const searchData = await searchResponse.json()
if (!searchData.messages || !searchData.messages.length) {
logger.info(`[${requestId}] No emails found matching query: ${query}`)
return { emails: [], latestHistoryId: config.historyId }
}
const idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25)
let latestHistoryId = config.historyId
logger.info(
`[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})`
)
const emailPromises = idsToFetch.map(async (message: { id: string }) => {
return getEmailDetails(accessToken, message.id)
})
const emailResults = await Promise.allSettled(emailPromises)
const rejected = emailResults.filter((r) => r.status === 'rejected')
if (rejected.length > 0) {
logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`)
}
const emails = emailResults
.filter(
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
)
.map((result) => result.value)
if (emails.length > 0 && emails[0].historyId) {
latestHistoryId = emails[0].historyId
}
return { emails, latestHistoryId }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error searching emails:`, errorMessage)
throw error
}
}
async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`
const messageResponse = await fetch(messageUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!messageResponse.ok) {
const errorData = await messageResponse.json().catch(() => ({}))
throw new Error(
`Failed to fetch email details for message ${messageId}: ${messageResponse.status} ${messageResponse.statusText} - ${JSON.stringify(errorData)}`
)
}
return await messageResponse.json()
}
function filterEmailsByLabels(emails: GmailEmail[], config: GmailWebhookConfig): GmailEmail[] {
if (!config.labelIds.length) {
return emails
}
return emails.filter((email) => {
const emailLabels = email.labelIds || []
const hasMatchingLabel = config.labelIds.some((configLabel) =>
emailLabels.includes(configLabel)
)
return config.labelFilterBehavior === 'INCLUDE'
? hasMatchingLabel // Include emails with matching labels
: !hasMatchingLabel // Exclude emails with matching labels
})
}
async function processEmails(
emails: any[],
webhookData: any,
config: GmailWebhookConfig,
accessToken: string,
requestId: string
) {
let processedCount = 0
let failedCount = 0
for (const email of emails) {
try {
await pollingIdempotency.executeWithIdempotency(
'gmail',
`${webhookData.id}:${email.id}`,
async () => {
const headers: Record<string, string> = {}
if (email.payload?.headers) {
for (const header of email.payload.headers) {
headers[header.name.toLowerCase()] = header.value
}
}
let textContent = ''
let htmlContent = ''
const extractContent = (part: any) => {
if (!part) return
if (part.mimeType === 'text/plain' && part.body?.data) {
textContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
} else if (part.mimeType === 'text/html' && part.body?.data) {
htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
}
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
extractContent(subPart)
}
}
}
if (email.payload) {
extractContent(email.payload)
}
let date: string | null = null
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (_e) {
// Keep date as null if parsing fails
}
} else if (email.internalDate) {
date = new Date(Number.parseInt(email.internalDate)).toISOString()
}
let attachments: GmailAttachment[] = []
const hasAttachments = email.payload
? extractAttachmentInfo(email.payload).length > 0
: false
if (config.includeAttachments && hasAttachments && email.payload) {
try {
const attachmentInfo = extractAttachmentInfo(email.payload)
attachments = await downloadAttachments(email.id, attachmentInfo, accessToken)
} catch (error) {
logger.error(
`[${requestId}] Error downloading attachments for email ${email.id}:`,
error
)
}
}
const simplifiedEmail: SimplifiedEmail = {
id: email.id,
threadId: email.threadId,
subject: headers.subject || '[No Subject]',
from: headers.from || '',
to: headers.to || '',
cc: headers.cc || '',
date: date,
bodyText: textContent,
bodyHtml: htmlContent,
labels: email.labelIds || [],
hasAttachments,
attachments,
}
const payload: GmailWebhookPayload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
...(config.includeRawEmail ? { rawEmail: email } : {}),
}
const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
const errorText = await response.text()
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
errorText
)
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
}
if (config.markAsRead) {
await markEmailAsRead(accessToken, email.id)
}
return {
emailId: email.id,
webhookStatus: response.status,
processed: true,
}
}
)
logger.info(
`[${requestId}] Successfully processed email ${email.id} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage)
failedCount++
}
}
return { processedCount, failedCount }
}
async function markEmailAsRead(accessToken: string, messageId: string) {
const modifyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}/modify`
try {
const response = await fetch(modifyUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
removeLabelIds: ['UNREAD'],
}),
})
if (!response.ok) {
await response.body?.cancel().catch(() => {})
throw new Error(
`Failed to mark email ${messageId} as read: ${response.status} ${response.statusText}`
)
}
} catch (error) {
logger.error(`Error marking email ${messageId} as read:`, error)
throw error
}
}
async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) {
try {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (error) {
logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error)
}
}

View File

@@ -0,0 +1,553 @@
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
markWebhookFailed,
markWebhookSuccess,
resolveOAuthCredential,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
import type { GmailAttachment } from '@/tools/gmail/types'
import { downloadAttachments, extractAttachmentInfo } from '@/tools/gmail/utils'
interface GmailWebhookConfig {
labelIds: string[]
labelFilterBehavior: 'INCLUDE' | 'EXCLUDE'
markAsRead: boolean
searchQuery?: string
maxEmailsPerPoll?: number
lastCheckedTimestamp?: string
historyId?: string
includeAttachments?: boolean
includeRawEmail?: boolean
}
interface GmailEmail {
id: string
threadId: string
historyId?: string
labelIds?: string[]
payload?: Record<string, unknown>
snippet?: string
internalDate?: string
}
export interface SimplifiedEmail {
id: string
threadId: string
subject: string
from: string
to: string
cc: string
date: string | null
bodyText: string
bodyHtml: string
labels: string[]
hasAttachments: boolean
attachments: GmailAttachment[]
}
export interface GmailWebhookPayload {
email: SimplifiedEmail
timestamp: string
rawEmail?: GmailEmail
}
export const gmailPollingHandler: PollingProviderHandler = {
provider: 'gmail',
label: 'Gmail',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
const accessToken = await resolveOAuthCredential(
webhookData,
'google-email',
requestId,
logger
)
const config = webhookData.providerConfig as unknown as GmailWebhookConfig
const now = new Date()
const { emails, latestHistoryId } = await fetchNewEmails(
accessToken,
config,
requestId,
logger
)
if (!emails || !emails.length) {
await updateWebhookProviderConfig(
webhookId,
{
lastCheckedTimestamp: now.toISOString(),
...(latestHistoryId || config.historyId
? { historyId: latestHistoryId || config.historyId }
: {}),
},
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
return 'success'
}
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
const { processedCount, failedCount } = await processEmails(
emails,
webhookData,
workflowData,
config,
accessToken,
requestId,
logger
)
await updateWebhookProviderConfig(
webhookId,
{
lastCheckedTimestamp: now.toISOString(),
...(latestHistoryId || config.historyId
? { historyId: latestHistoryId || config.historyId }
: {}),
},
logger
)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
)
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
async function fetchNewEmails(
accessToken: string,
config: GmailWebhookConfig,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
try {
const useHistoryApi = !!config.historyId
let emails: GmailEmail[] = []
let latestHistoryId = config.historyId
if (useHistoryApi) {
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
const historyResponse = await fetch(historyUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!historyResponse.ok) {
const errorData = await historyResponse.json()
logger.error(`[${requestId}] Gmail history API error:`, {
status: historyResponse.status,
statusText: historyResponse.statusText,
error: errorData,
})
logger.info(`[${requestId}] Falling back to search API after history API failure`)
return searchEmails(accessToken, config, requestId, logger)
}
const historyData = await historyResponse.json()
if (!historyData.history || !historyData.history.length) {
return { emails: [], latestHistoryId }
}
if (historyData.historyId) {
latestHistoryId = historyData.historyId
}
const messageIds = new Set<string>()
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
}
}
}
if (messageIds.size === 0) {
return { emails: [], latestHistoryId }
}
const sortedIds = [...messageIds].sort().reverse()
const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25)
logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`)
const emailResults = await Promise.allSettled(
idsToFetch.map((messageId) => getEmailDetails(accessToken, messageId))
)
const rejected = emailResults.filter((r) => r.status === 'rejected')
if (rejected.length > 0) {
logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`)
}
emails = emailResults
.filter(
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
)
.map((result) => result.value)
emails = filterEmailsByLabels(emails, config)
} else {
return searchEmails(accessToken, config, requestId, logger)
}
return { emails, latestHistoryId }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error fetching new emails:`, errorMessage)
throw error
}
}
function buildGmailSearchQuery(config: {
labelIds?: string[]
labelFilterBehavior?: 'INCLUDE' | 'EXCLUDE'
searchQuery?: string
}): string {
let labelQuery = ''
if (config.labelIds && config.labelIds.length > 0) {
const labelParts = config.labelIds.map((label) => `label:${label}`).join(' OR ')
labelQuery =
config.labelFilterBehavior === 'INCLUDE'
? config.labelIds.length > 1
? `(${labelParts})`
: labelParts
: config.labelIds.length > 1
? `-(${labelParts})`
: `-${labelParts}`
}
let searchQueryPart = ''
if (config.searchQuery?.trim()) {
searchQueryPart = config.searchQuery.trim()
if (searchQueryPart.includes(' OR ') || searchQueryPart.includes(' AND ')) {
searchQueryPart = `(${searchQueryPart})`
}
}
let baseQuery = ''
if (labelQuery && searchQueryPart) {
baseQuery = `${labelQuery} ${searchQueryPart}`
} else if (searchQueryPart) {
baseQuery = searchQueryPart
} else if (labelQuery) {
baseQuery = labelQuery
} else {
baseQuery = 'in:inbox'
}
return baseQuery
}
async function searchEmails(
accessToken: string,
config: GmailWebhookConfig,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
try {
const baseQuery = buildGmailSearchQuery(config)
let timeConstraint = ''
if (config.lastCheckedTimestamp) {
const lastCheckedTime = new Date(config.lastCheckedTimestamp)
const now = new Date()
const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000)
if (minutesSinceLastCheck < 60) {
const bufferSeconds = Math.max(1 * 60 * 2, 180)
const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000)
const timestamp = Math.floor(cutoffTime.getTime() / 1000)
timeConstraint = ` after:${timestamp}`
} else if (minutesSinceLastCheck < 24 * 60) {
const hours = Math.ceil(minutesSinceLastCheck / 60) + 1
timeConstraint = ` newer_than:${hours}h`
} else {
const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1
timeConstraint = ` newer_than:${days}d`
}
} else {
timeConstraint = ' newer_than:1d'
}
const query = `${baseQuery}${timeConstraint}`
const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}`
const searchResponse = await fetch(searchUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!searchResponse.ok) {
const errorData = await searchResponse.json()
logger.error(`[${requestId}] Gmail search API error:`, {
status: searchResponse.status,
statusText: searchResponse.statusText,
query,
error: errorData,
})
throw new Error(
`Gmail API error: ${searchResponse.status} ${searchResponse.statusText} - ${JSON.stringify(errorData)}`
)
}
const searchData = await searchResponse.json()
if (!searchData.messages || !searchData.messages.length) {
logger.info(`[${requestId}] No emails found matching query: ${query}`)
return { emails: [], latestHistoryId: config.historyId }
}
const idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25)
let latestHistoryId = config.historyId
logger.info(
`[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})`
)
const emailResults = await Promise.allSettled(
idsToFetch.map((message: { id: string }) => getEmailDetails(accessToken, message.id))
)
const rejected = emailResults.filter((r) => r.status === 'rejected')
if (rejected.length > 0) {
logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`)
}
const emails = emailResults
.filter(
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
)
.map((result) => result.value)
if (emails.length > 0 && emails[0].historyId) {
latestHistoryId = emails[0].historyId
}
return { emails, latestHistoryId }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error searching emails:`, errorMessage)
throw error
}
}
async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`
const messageResponse = await fetch(messageUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!messageResponse.ok) {
const errorData = await messageResponse.json().catch(() => ({}))
throw new Error(
`Failed to fetch email details for message ${messageId}: ${messageResponse.status} ${messageResponse.statusText} - ${JSON.stringify(errorData)}`
)
}
return await messageResponse.json()
}
function filterEmailsByLabels(emails: GmailEmail[], config: GmailWebhookConfig): GmailEmail[] {
if (!config.labelIds.length) {
return emails
}
return emails.filter((email) => {
const emailLabels = email.labelIds || []
const hasMatchingLabel = config.labelIds.some((configLabel) =>
emailLabels.includes(configLabel)
)
return config.labelFilterBehavior === 'INCLUDE' ? hasMatchingLabel : !hasMatchingLabel
})
}
async function processEmails(
emails: GmailEmail[],
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
config: GmailWebhookConfig,
accessToken: string,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
let processedCount = 0
let failedCount = 0
for (const email of emails) {
try {
await pollingIdempotency.executeWithIdempotency(
'gmail',
`${webhookData.id}:${email.id}`,
async () => {
const headers: Record<string, string> = {}
const payload = email.payload as Record<string, unknown> | undefined
if (payload?.headers && Array.isArray(payload.headers)) {
for (const header of payload.headers as { name: string; value: string }[]) {
headers[header.name.toLowerCase()] = header.value
}
}
let textContent = ''
let htmlContent = ''
const extractContent = (part: Record<string, unknown>) => {
if (!part) return
if (part.mimeType === 'text/plain') {
const body = part.body as { data?: string } | undefined
if (body?.data) {
textContent = Buffer.from(body.data, 'base64').toString('utf-8')
}
} else if (part.mimeType === 'text/html') {
const body = part.body as { data?: string } | undefined
if (body?.data) {
htmlContent = Buffer.from(body.data, 'base64').toString('utf-8')
}
}
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
extractContent(subPart as Record<string, unknown>)
}
}
}
if (payload) {
extractContent(payload)
}
let date: string | null = null
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (_e) {
// Keep date as null if parsing fails
}
} else if (email.internalDate) {
date = new Date(Number.parseInt(email.internalDate)).toISOString()
}
let attachments: GmailAttachment[] = []
const hasAttachments = payload ? extractAttachmentInfo(payload).length > 0 : false
if (config.includeAttachments && hasAttachments && payload) {
try {
const attachmentInfo = extractAttachmentInfo(payload)
attachments = await downloadAttachments(email.id, attachmentInfo, accessToken)
} catch (error) {
logger.error(
`[${requestId}] Error downloading attachments for email ${email.id}:`,
error
)
}
}
const simplifiedEmail: SimplifiedEmail = {
id: email.id,
threadId: email.threadId,
subject: headers.subject || '[No Subject]',
from: headers.from || '',
to: headers.to || '',
cc: headers.cc || '',
date,
bodyText: textContent,
bodyHtml: htmlContent,
labels: email.labelIds || [],
hasAttachments,
attachments,
}
const webhookPayload: GmailWebhookPayload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
...(config.includeRawEmail ? { rawEmail: email } : {}),
}
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
webhookPayload,
requestId
)
if (!result.success) {
logger.error(
`[${requestId}] Failed to process webhook for email ${email.id}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
if (config.markAsRead) {
await markEmailAsRead(accessToken, email.id, logger)
}
return { emailId: email.id, processed: true }
}
)
logger.info(
`[${requestId}] Successfully processed email ${email.id} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage)
failedCount++
}
}
return { processedCount, failedCount }
}
async function markEmailAsRead(
accessToken: string,
messageId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
const modifyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}/modify`
try {
const response = await fetch(modifyUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ removeLabelIds: ['UNREAD'] }),
})
if (!response.ok) {
await response.body?.cancel().catch(() => {})
throw new Error(
`Failed to mark email ${messageId} as read: ${response.status} ${response.statusText}`
)
}
} catch (error) {
logger.error(`Error marking email ${messageId} as read:`, error)
throw error
}
}

View File

@@ -1,19 +1,14 @@
import { db } from '@sim/db'
import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import type { InferSelectModel } from 'drizzle-orm'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import type { FetchMessageObject, MailboxLockObject } from 'imapflow'
import { ImapFlow } from 'imapflow'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { generateShortId } from '@/lib/core/utils/uuid'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('ImapPollingService')
type WebhookRecord = InferSelectModel<typeof webhook>
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
markWebhookFailed,
markWebhookSuccess,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
interface ImapWebhookConfig {
host: string
@@ -21,13 +16,13 @@ interface ImapWebhookConfig {
secure: boolean
username: string
password: string
mailbox: string | string[] // Can be single mailbox or array of mailboxes
mailbox: string | string[]
searchCriteria: string
markAsRead: boolean
includeAttachments: boolean
lastProcessedUid?: number
lastProcessedUidByMailbox?: Record<string, number> // Track UID per mailbox for multi-mailbox
lastCheckedTimestamp?: string // ISO timestamp of last successful poll
lastProcessedUidByMailbox?: Record<string, number>
lastCheckedTimestamp?: string
maxEmailsPerPoll?: number
}
@@ -69,206 +64,112 @@ export interface ImapWebhookPayload {
timestamp: string
}
async function markWebhookFailed(webhookId: string) {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
export const imapPollingHandler: PollingProviderHandler = {
provider: 'imap',
label: 'IMAP',
const newFailedCount = result[0]?.failedCount || 0
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
if (shouldDisable) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
try {
const config = webhookData.providerConfig as unknown as ImapWebhookConfig
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
async function markWebhookSuccess(webhookId: string) {
try {
await db
.update(webhook)
.set({
failedCount: 0,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
export async function pollImapWebhooks() {
logger.info('Starting IMAP webhook polling')
try {
const activeWebhooksResult = await db
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'imap'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
if (!activeWebhooks.length) {
logger.info('No active IMAP webhooks found')
return { total: 0, successful: 0, failed: 0, details: [] }
}
logger.info(`Found ${activeWebhooks.length} active IMAP webhooks`)
const CONCURRENCY = 5
const running: Promise<void>[] = []
let successCount = 0
let failureCount = 0
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
const webhookId = webhookData.id
const requestId = generateShortId()
try {
const config = webhookData.providerConfig as unknown as ImapWebhookConfig
if (!config.host || !config.username || !config.password) {
logger.error(`[${requestId}] Missing IMAP credentials for webhook ${webhookId}`)
await markWebhookFailed(webhookId)
failureCount++
return
}
const hostValidation = await validateDatabaseHost(config.host, 'host')
if (!hostValidation.isValid) {
logger.error(
`[${requestId}] IMAP host validation failed for webhook ${webhookId}: ${hostValidation.error}`
)
await markWebhookFailed(webhookId)
failureCount++
return
}
const fetchResult = await fetchNewEmails(config, requestId, hostValidation.resolvedIP!)
const { emails, latestUidByMailbox } = fetchResult
const pollTimestamp = new Date().toISOString()
if (!emails || !emails.length) {
await updateWebhookLastProcessedUids(webhookId, latestUidByMailbox, pollTimestamp)
await markWebhookSuccess(webhookId)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
successCount++
return
}
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
const { processedCount, failedCount: emailFailedCount } = await processEmails(
emails,
webhookData,
config,
requestId,
hostValidation.resolvedIP!
)
await updateWebhookLastProcessedUids(webhookId, latestUidByMailbox, pollTimestamp)
if (emailFailedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId)
failureCount++
logger.warn(
`[${requestId}] All ${emailFailedCount} emails failed to process for webhook ${webhookId}`
)
} else {
await markWebhookSuccess(webhookId)
successCount++
logger.info(
`[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${emailFailedCount > 0 ? ` (${emailFailedCount} failed)` : ''}`
)
}
} catch (error) {
logger.error(`[${requestId}] Error processing IMAP webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId)
failureCount++
if (!config.host || !config.username || !config.password) {
logger.error(`[${requestId}] Missing IMAP credentials for webhook ${webhookId}`)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
}
for (const webhookData of activeWebhooks) {
const promise: Promise<void> = enqueue(webhookData)
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
.finally(() => {
// Self-remove from running array when completed
const idx = running.indexOf(promise)
if (idx !== -1) running.splice(idx, 1)
})
running.push(promise)
if (running.length >= CONCURRENCY) {
await Promise.race(running)
const hostValidation = await validateDatabaseHost(config.host, 'host')
if (!hostValidation.isValid) {
logger.error(
`[${requestId}] IMAP host validation failed for webhook ${webhookId}: ${hostValidation.error}`
)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
const { emails, latestUidByMailbox } = await fetchNewEmails(
config,
requestId,
hostValidation.resolvedIP!,
logger
)
const pollTimestamp = new Date().toISOString()
if (!emails || !emails.length) {
await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
return 'success'
}
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
const { processedCount, failedCount } = await processEmails(
emails,
webhookData,
workflowData,
config,
requestId,
hostValidation.resolvedIP!,
logger
)
await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
)
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing IMAP webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
await Promise.allSettled(running)
const summary = {
total: activeWebhooks.length,
successful: successCount,
failed: failureCount,
details: [],
}
logger.info('IMAP polling completed', {
total: summary.total,
successful: summary.successful,
failed: summary.failed,
})
return summary
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error('Error in IMAP polling service:', errorMessage)
throw error
}
},
}
async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, resolvedIP: string) {
async function updateImapState(
webhookId: string,
uidByMailbox: Record<string, number>,
timestamp: string,
config: ImapWebhookConfig,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
const existingUidByMailbox = config.lastProcessedUidByMailbox || {}
const mergedUidByMailbox = { ...existingUidByMailbox }
for (const [mailbox, uid] of Object.entries(uidByMailbox)) {
mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0)
}
await updateWebhookProviderConfig(
webhookId,
{
lastProcessedUidByMailbox: mergedUidByMailbox,
lastCheckedTimestamp: timestamp,
},
logger
)
}
async function fetchNewEmails(
config: ImapWebhookConfig,
requestId: string,
resolvedIP: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
const client = new ImapFlow({
host: resolvedIP,
servername: config.host,
@@ -278,15 +179,13 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
user: config.username,
pass: config.password,
},
tls: {
rejectUnauthorized: true,
},
tls: { rejectUnauthorized: true },
logger: false,
})
const emails: Array<{
uid: number
mailboxPath: string // Track which mailbox this email came from
mailboxPath: string
envelope: FetchMessageObject['envelope']
bodyStructure: FetchMessageObject['bodyStructure']
source?: Buffer
@@ -305,13 +204,12 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
if (totalEmailsCollected >= maxEmails) break
try {
const mailbox = await client.mailboxOpen(mailboxPath)
await client.mailboxOpen(mailboxPath)
// Parse search criteria - expects JSON object from UI
let searchCriteria: any = { unseen: true }
let searchCriteria: Record<string, unknown> = { unseen: true }
if (config.searchCriteria) {
if (typeof config.searchCriteria === 'object') {
searchCriteria = config.searchCriteria
searchCriteria = config.searchCriteria as unknown as Record<string, unknown>
} else if (typeof config.searchCriteria === 'string') {
try {
searchCriteria = JSON.parse(config.searchCriteria)
@@ -327,15 +225,11 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
searchCriteria = { ...searchCriteria, uid: `${lastUidForMailbox + 1}:*` }
}
// Add time-based filtering similar to Gmail
// If lastCheckedTimestamp exists, use it with 1 minute buffer
// If first poll (no timestamp), default to last 24 hours to avoid processing ALL unseen emails
if (config.lastCheckedTimestamp) {
const lastChecked = new Date(config.lastCheckedTimestamp)
const bufferTime = new Date(lastChecked.getTime() - 60000)
searchCriteria = { ...searchCriteria, since: bufferTime }
} else {
// First poll: only get emails from last 24 hours to avoid overwhelming first run
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000)
searchCriteria = { ...searchCriteria, since: oneDayAgo }
}
@@ -344,15 +238,13 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
try {
const searchResult = await client.search(searchCriteria, { uid: true })
messageUids = searchResult === false ? [] : searchResult
} catch (searchError) {
} catch {
continue
}
if (messageUids.length === 0) {
continue
}
if (messageUids.length === 0) continue
messageUids.sort((a, b) => a - b) // Sort ascending to process oldest first
messageUids.sort((a, b) => a - b)
const remainingSlots = maxEmails - totalEmailsCollected
const uidsToProcess = messageUids.slice(0, remainingSlots)
@@ -365,12 +257,7 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
for await (const msg of client.fetch(
uidsToProcess,
{
uid: true,
envelope: true,
bodyStructure: true,
source: true,
},
{ uid: true, envelope: true, bodyStructure: true, source: true },
{ uid: true }
)) {
emails.push({
@@ -388,7 +275,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
}
await client.logout()
return { emails, latestUidByMailbox }
} catch (error) {
try {
@@ -400,9 +286,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso
}
}
/**
* Get the list of mailboxes to check based on config
*/
function getMailboxesToCheck(config: ImapWebhookConfig): string[] {
if (!config.mailbox || (Array.isArray(config.mailbox) && config.mailbox.length === 0)) {
return ['INBOX']
@@ -488,7 +371,6 @@ function extractAttachmentsFromSource(
bodyStructure: FetchMessageObject['bodyStructure']
): ImapAttachment[] {
const attachments: ImapAttachment[] = []
if (!bodyStructure) return attachments
const content = source.toString('utf-8')
@@ -534,24 +416,13 @@ function extractAttachmentsFromSource(
return attachments
}
/**
* Checks if a body structure contains attachments by examining disposition
*/
function hasAttachmentsInBodyStructure(structure: FetchMessageObject['bodyStructure']): boolean {
if (!structure) return false
if (structure.disposition === 'attachment') {
return true
}
if (structure.disposition === 'inline' && structure.dispositionParameters?.filename) {
return true
}
if (structure.disposition === 'attachment') return true
if (structure.disposition === 'inline' && structure.dispositionParameters?.filename) return true
if (structure.childNodes && Array.isArray(structure.childNodes)) {
return structure.childNodes.some((child) => hasAttachmentsInBodyStructure(child))
}
return false
}
@@ -563,10 +434,12 @@ async function processEmails(
bodyStructure: FetchMessageObject['bodyStructure']
source?: Buffer
}>,
webhookData: WebhookRecord,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
config: ImapWebhookConfig,
requestId: string,
resolvedIP: string
resolvedIP: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
let processedCount = 0
let failedCount = 0
@@ -580,9 +453,7 @@ async function processEmails(
user: config.username,
pass: config.password,
},
tls: {
rejectUnauthorized: true,
},
tls: { rejectUnauthorized: true },
logger: false,
})
@@ -644,25 +515,20 @@ async function processEmails(
timestamp: new Date().toISOString(),
}
const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
payload,
requestId
)
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
const errorText = await response.text()
if (!result.success) {
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.uid}:`,
response.status,
errorText
`[${requestId}] Failed to process webhook for email ${email.uid}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
if (config.markAsRead) {
@@ -684,11 +550,7 @@ async function processEmails(
}
}
return {
emailUid: email.uid,
webhookStatus: response.status,
processed: true,
}
return { emailUid: email.uid, processed: true }
}
)
@@ -717,31 +579,3 @@ async function processEmails(
return { processedCount, failedCount }
}
async function updateWebhookLastProcessedUids(
webhookId: string,
uidByMailbox: Record<string, number>,
timestamp: string
) {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
const existingUidByMailbox = existingConfig.lastProcessedUidByMailbox || {}
const mergedUidByMailbox = { ...existingUidByMailbox }
for (const [mailbox, uid] of Object.entries(uidByMailbox)) {
mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0)
}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastProcessedUidByMailbox: mergedUidByMailbox,
lastCheckedTimestamp: timestamp,
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
}

View File

@@ -0,0 +1,9 @@
export { pollProvider } from '@/lib/webhooks/polling/orchestrator'
export { getPollingHandler, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling/registry'
export type {
PollingProviderHandler,
PollSummary,
PollWebhookContext,
WebhookRecord,
WorkflowRecord,
} from '@/lib/webhooks/polling/types'

View File

@@ -0,0 +1,46 @@
import { createLogger } from '@sim/logger'
import { generateShortId } from '@/lib/core/utils/uuid'
import { getPollingHandler } from '@/lib/webhooks/polling/registry'
import type { PollSummary, WebhookRecord, WorkflowRecord } from '@/lib/webhooks/polling/types'
import { fetchActiveWebhooks, runWithConcurrency } from '@/lib/webhooks/polling/utils'
/** Poll all active webhooks for a given provider. */
export async function pollProvider(providerName: string): Promise<PollSummary> {
const handler = getPollingHandler(providerName)
if (!handler) {
throw new Error(`Unknown polling provider: ${providerName}`)
}
const logger = createLogger(`${handler.label}PollingService`)
logger.info(`Starting ${handler.label} webhook polling`)
const activeWebhooks = await fetchActiveWebhooks(handler.provider)
if (!activeWebhooks.length) {
logger.info(`No active ${handler.label} webhooks found`)
return { total: 0, successful: 0, failed: 0 }
}
logger.info(`Found ${activeWebhooks.length} active ${handler.label} webhooks`)
const { successCount, failureCount } = await runWithConcurrency(
activeWebhooks,
async (entry) => {
const requestId = generateShortId()
return handler.pollWebhook({
webhookData: entry.webhook as WebhookRecord,
workflowData: entry.workflow as WorkflowRecord,
requestId,
logger,
})
},
logger
)
const summary: PollSummary = {
total: activeWebhooks.length,
successful: successCount,
failed: failureCount,
}
logger.info(`${handler.label} polling completed`, summary)
return summary
}

View File

@@ -1,77 +1,17 @@
import { db } from '@sim/db'
import {
account,
credentialSet,
webhook,
workflow,
workflowDeploymentVersion,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import { htmlToText } from 'html-to-text'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing'
import { pollingIdempotency } from '@/lib/core/idempotency'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { generateShortId } from '@/lib/core/utils/uuid'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
getOAuthToken,
refreshAccessTokenIfNeeded,
resolveOAuthAccountId,
} from '@/app/api/auth/oauth/utils'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('OutlookPollingService')
async function markWebhookFailed(webhookId: string) {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
const newFailedCount = result[0]?.failedCount || 0
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
async function markWebhookSuccess(webhookId: string) {
try {
await db
.update(webhook)
.set({
failedCount: 0, // Reset on success
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
markWebhookFailed,
markWebhookSuccess,
resolveOAuthCredential,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
interface OutlookWebhookConfig {
credentialId: string
folderIds?: string[] // e.g., ['inbox', 'sent']
folderIds?: string[]
folderFilterBehavior?: 'INCLUDE' | 'EXCLUDE'
markAsRead?: boolean
maxEmailsPerPoll?: number
@@ -145,10 +85,6 @@ export interface OutlookWebhookPayload {
rawEmail?: OutlookEmail
}
/**
* Convert HTML content to a readable plain-text representation.
* Keeps reasonable newlines and decodes common HTML entities.
*/
function convertHtmlToPlainText(html: string): string {
if (!html) return ''
return htmlToText(html, {
@@ -163,217 +99,78 @@ function convertHtmlToPlainText(html: string): string {
})
}
export async function pollOutlookWebhooks() {
logger.info('Starting Outlook webhook polling')
export const outlookPollingHandler: PollingProviderHandler = {
provider: 'outlook',
label: 'Outlook',
try {
const activeWebhooksResult = await db
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'outlook'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`)
const accessToken = await resolveOAuthCredential(webhookData, 'outlook', requestId, logger)
const config = webhookData.providerConfig as unknown as OutlookWebhookConfig
const now = new Date()
const { emails } = await fetchNewOutlookEmails(accessToken, config, requestId, logger)
if (!emails || !emails.length) {
await updateWebhookProviderConfig(
webhookId,
{ lastCheckedTimestamp: now.toISOString() },
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
return 'success'
}
logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`)
const { processedCount, failedCount } = await processOutlookEmails(
emails,
webhookData,
workflowData,
config,
accessToken,
requestId,
logger
)
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
await updateWebhookProviderConfig(
webhookId,
{ lastCheckedTimestamp: now.toISOString() },
logger
)
if (!activeWebhooks.length) {
logger.info('No active Outlook webhooks found')
return { total: 0, successful: 0, failed: 0, details: [] }
}
logger.info(`Found ${activeWebhooks.length} active Outlook webhooks`)
const CONCURRENCY = 10
const running: Promise<void>[] = []
let successCount = 0
let failureCount = 0
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
const webhookId = webhookData.id
const requestId = generateShortId()
try {
logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`)
const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId
const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined
if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`)
await markWebhookFailed(webhookId)
failureCount++
return
}
if (credentialSetId) {
const [cs] = await db
.select({ organizationId: credentialSet.organizationId })
.from(credentialSet)
.where(eq(credentialSet.id, credentialSetId))
.limit(1)
if (cs?.organizationId) {
const hasAccess = await isOrganizationOnTeamOrEnterprisePlan(cs.organizationId)
if (!hasAccess) {
logger.error(
`[${requestId}] Polling Group plan restriction: Your current plan does not support Polling Groups. Upgrade to Team or Enterprise to use this feature.`,
{
webhookId,
credentialSetId,
organizationId: cs.organizationId,
}
)
await markWebhookFailed(webhookId)
failureCount++
return
}
}
}
let accessToken: string | null = null
if (credentialId) {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
logger.error(
`[${requestId}] Failed to resolve OAuth account for credential ${credentialId}, webhook ${webhookId}`
)
await markWebhookFailed(webhookId)
failureCount++
return
}
const rows = await db
.select()
.from(account)
.where(eq(account.id, resolved.accountId))
.limit(1)
if (!rows.length) {
logger.error(
`[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}`
)
await markWebhookFailed(webhookId)
failureCount++
return
}
const ownerUserId = rows[0].userId
accessToken = await refreshAccessTokenIfNeeded(resolved.accountId, ownerUserId, requestId)
} else if (userId) {
accessToken = await getOAuthToken(userId, 'outlook')
}
if (!accessToken) {
logger.error(
`[${requestId}] Failed to get Outlook access token for webhook ${webhookId} (cred or fallback)`
)
await markWebhookFailed(webhookId)
failureCount++
return
}
const config = webhookData.providerConfig as unknown as OutlookWebhookConfig
const now = new Date()
const fetchResult = await fetchNewOutlookEmails(accessToken, config, requestId)
const { emails } = fetchResult
if (!emails || !emails.length) {
await updateWebhookLastChecked(webhookId, now.toISOString())
await markWebhookSuccess(webhookId)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
successCount++
return
}
logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`)
logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`)
const { processedCount, failedCount } = await processOutlookEmails(
emails,
webhookData,
config,
accessToken,
requestId
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
)
await updateWebhookLastChecked(webhookId, now.toISOString())
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId)
failureCount++
logger.warn(
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
)
} else {
await markWebhookSuccess(webhookId)
successCount++
logger.info(
`[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
}
} catch (error) {
logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId)
failureCount++
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
for (const webhookData of activeWebhooks) {
const promise: Promise<void> = enqueue(webhookData)
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
.finally(() => {
const idx = running.indexOf(promise)
if (idx !== -1) running.splice(idx, 1)
})
running.push(promise)
if (running.length >= CONCURRENCY) {
await Promise.race(running)
}
}
await Promise.allSettled(running)
logger.info(`Outlook polling completed: ${successCount} successful, ${failureCount} failed`)
return {
total: activeWebhooks.length,
successful: successCount,
failed: failureCount,
details: [],
}
} catch (error) {
logger.error('Error during Outlook webhook polling:', error)
throw error
}
},
}
async function fetchNewOutlookEmails(
accessToken: string,
config: OutlookWebhookConfig,
requestId: string
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
try {
const apiUrl = 'https://graph.microsoft.com/v1.0/me/messages'
@@ -383,9 +180,7 @@ async function fetchNewOutlookEmails(
'$select',
'id,conversationId,subject,bodyPreview,body,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,hasAttachments,isRead,parentFolderId'
)
params.append('$orderby', 'receivedDateTime desc')
params.append('$top', (config.maxEmailsPerPoll || 25).toString())
if (config.lastCheckedTimestamp) {
@@ -395,7 +190,6 @@ async function fetchNewOutlookEmails(
}
const fullUrl = `${apiUrl}?${params.toString()}`
logger.info(`[${requestId}] Fetching emails from: ${fullUrl}`)
const response = await fetch(fullUrl, {
@@ -427,7 +221,8 @@ async function fetchNewOutlookEmails(
resolvedFolderIds = await resolveWellKnownFolderIds(
accessToken,
config.folderIds,
requestId
requestId,
logger
)
}
}
@@ -463,7 +258,8 @@ function isWellKnownFolderName(folderId: string): boolean {
async function resolveWellKnownFolderId(
accessToken: string,
folderName: string,
requestId: string
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string | null> {
try {
const response = await fetch(`https://graph.microsoft.com/v1.0/me/mailFolders/${folderName}`, {
@@ -491,18 +287,16 @@ async function resolveWellKnownFolderId(
async function resolveWellKnownFolderIds(
accessToken: string,
folderIds: string[],
requestId: string
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<Map<string, string>> {
const resolvedIds = new Map<string, string>()
const wellKnownFolders = folderIds.filter(isWellKnownFolderName)
if (wellKnownFolders.length === 0) {
return resolvedIds
}
if (wellKnownFolders.length === 0) return resolvedIds
const resolutions = await Promise.all(
wellKnownFolders.map(async (folderName) => {
const actualId = await resolveWellKnownFolderId(accessToken, folderName, requestId)
const actualId = await resolveWellKnownFolderId(accessToken, folderName, requestId, logger)
return { folderName, actualId }
})
)
@@ -516,7 +310,6 @@ async function resolveWellKnownFolderIds(
logger.info(
`[${requestId}] Resolved ${resolvedIds.size}/${wellKnownFolders.length} well-known folders`
)
return resolvedIds
}
@@ -525,16 +318,12 @@ function filterEmailsByFolder(
config: OutlookWebhookConfig,
resolvedFolderIds?: Map<string, string>
): OutlookEmail[] {
if (!config.folderIds || !config.folderIds.length) {
return emails
}
if (!config.folderIds || !config.folderIds.length) return emails
const actualFolderIds = config.folderIds.map((configFolder) => {
if (resolvedFolderIds && isWellKnownFolderName(configFolder)) {
const resolvedId = resolvedFolderIds.get(configFolder.toLowerCase())
if (resolvedId) {
return resolvedId
}
if (resolvedId) return resolvedId
}
return configFolder
})
@@ -544,17 +333,18 @@ function filterEmailsByFolder(
const hasMatchingFolder = actualFolderIds.some(
(folderId) => emailFolderId.toLowerCase() === folderId.toLowerCase()
)
return config.folderFilterBehavior === 'INCLUDE' ? hasMatchingFolder : !hasMatchingFolder
})
}
async function processOutlookEmails(
emails: OutlookEmail[],
webhookData: any,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
config: OutlookWebhookConfig,
accessToken: string,
requestId: string
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
let processedCount = 0
let failedCount = 0
@@ -568,7 +358,12 @@ async function processOutlookEmails(
let attachments: OutlookAttachment[] = []
if (config.includeAttachments && email.hasAttachments) {
try {
attachments = await downloadOutlookAttachments(accessToken, email.id, requestId)
attachments = await downloadOutlookAttachments(
accessToken,
email.id,
requestId,
logger
)
} catch (error) {
logger.error(
`[${requestId}] Error downloading attachments for email ${email.id}:`,
@@ -588,12 +383,8 @@ async function processOutlookEmails(
bodyText: (() => {
const content = email.body?.content || ''
const type = (email.body?.contentType || '').toLowerCase()
if (!content) {
return email.bodyPreview || ''
}
if (type === 'text' || type === 'text/plain') {
return content
}
if (!content) return email.bodyPreview || ''
if (type === 'text' || type === 'text/plain') return content
return convertHtmlToPlainText(content)
})(),
bodyHtml: email.body?.content || '',
@@ -618,36 +409,27 @@ async function processOutlookEmails(
`[${requestId}] Processing email: ${email.subject} from ${email.from?.emailAddress?.address}`
)
const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
payload,
requestId
)
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
const errorText = await response.text()
if (!result.success) {
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
errorText
`[${requestId}] Failed to process webhook for email ${email.id}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
if (config.markAsRead) {
await markOutlookEmailAsRead(accessToken, email.id)
await markOutlookEmailAsRead(accessToken, email.id, logger)
}
return {
emailId: email.id,
webhookStatus: response.status,
processed: true,
}
return { emailId: email.id, processed: true }
}
)
@@ -667,7 +449,8 @@ async function processOutlookEmails(
async function downloadOutlookAttachments(
accessToken: string,
messageId: string,
requestId: string
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<OutlookAttachment[]> {
const attachments: OutlookAttachment[] = []
@@ -722,7 +505,11 @@ async function downloadOutlookAttachments(
return attachments
}
async function markOutlookEmailAsRead(accessToken: string, messageId: string) {
async function markOutlookEmailAsRead(
accessToken: string,
messageId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
try {
const response = await fetch(`https://graph.microsoft.com/v1.0/me/messages/${messageId}`, {
method: 'PATCH',
@@ -730,9 +517,7 @@ async function markOutlookEmailAsRead(accessToken: string, messageId: string) {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
isRead: true,
}),
body: JSON.stringify({ isRead: true }),
})
if (!response.ok) {
@@ -746,34 +531,3 @@ async function markOutlookEmailAsRead(accessToken: string, messageId: string) {
logger.error(`Error marking email ${messageId} as read:`, error)
}
}
async function updateWebhookLastChecked(webhookId: string, timestamp: string) {
try {
const currentWebhook = await db
.select({ providerConfig: webhook.providerConfig })
.from(webhook)
.where(eq(webhook.id, webhookId))
.limit(1)
if (!currentWebhook.length) {
logger.error(`Webhook ${webhookId} not found`)
return
}
const currentConfig = (currentWebhook[0].providerConfig as any) || {}
const updatedConfig = {
...currentConfig,
lastCheckedTimestamp: timestamp,
}
await db
.update(webhook)
.set({
providerConfig: updatedConfig,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (error) {
logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error)
}
}

View File

@@ -0,0 +1,19 @@
import { gmailPollingHandler } from '@/lib/webhooks/polling/gmail'
import { imapPollingHandler } from '@/lib/webhooks/polling/imap'
import { outlookPollingHandler } from '@/lib/webhooks/polling/outlook'
import { rssPollingHandler } from '@/lib/webhooks/polling/rss'
import type { PollingProviderHandler } from '@/lib/webhooks/polling/types'
const POLLING_HANDLERS: Record<string, PollingProviderHandler> = {
gmail: gmailPollingHandler,
imap: imapPollingHandler,
outlook: outlookPollingHandler,
rss: rssPollingHandler,
}
export const VALID_POLLING_PROVIDERS = new Set(Object.keys(POLLING_HANDLERS))
/** Look up the polling handler for a provider. */
export function getPollingHandler(provider: string): PollingProviderHandler | undefined {
return POLLING_HANDLERS[provider]
}

View File

@@ -0,0 +1,307 @@
import Parser from 'rss-parser'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import {
secureFetchWithPinnedIP,
validateUrlWithDNS,
} from '@/lib/core/security/input-validation.server'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
markWebhookFailed,
markWebhookSuccess,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
const MAX_GUIDS_TO_TRACK = 100
interface RssWebhookConfig {
feedUrl: string
lastCheckedTimestamp?: string
lastSeenGuids?: string[]
etag?: string
lastModified?: string
}
interface RssItem {
title?: string
link?: string
pubDate?: string
guid?: string
description?: string
content?: string
contentSnippet?: string
author?: string
creator?: string
categories?: string[]
enclosure?: {
url: string
type?: string
length?: string | number
}
isoDate?: string
[key: string]: unknown
}
interface RssFeed {
title?: string
link?: string
description?: string
items: RssItem[]
}
export interface RssWebhookPayload {
title?: string
link?: string
pubDate?: string
item: RssItem
feed: {
title?: string
link?: string
description?: string
}
timestamp: string
}
const parser = new Parser({
timeout: 30000,
headers: {
'User-Agent': 'Sim/1.0 RSS Poller',
},
})
export const rssPollingHandler: PollingProviderHandler = {
provider: 'rss',
label: 'RSS',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
const config = webhookData.providerConfig as unknown as RssWebhookConfig
if (!config?.feedUrl) {
logger.error(`[${requestId}] Missing feedUrl for webhook ${webhookId}`)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
const now = new Date()
const { feed, items: newItems } = await fetchNewRssItems(config, requestId, logger)
if (!newItems.length) {
await updateRssState(webhookId, now.toISOString(), [], config, logger)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No new items found for webhook ${webhookId}`)
return 'success'
}
logger.info(`[${requestId}] Found ${newItems.length} new items for webhook ${webhookId}`)
const { processedCount, failedCount } = await processRssItems(
newItems,
feed,
webhookData,
workflowData,
requestId,
logger
)
const newGuids = newItems
.map((item) => item.guid || item.link || '')
.filter((guid) => guid.length > 0)
await updateRssState(webhookId, now.toISOString(), newGuids, config, logger)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} items failed to process for webhook ${webhookId}`
)
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} items for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing RSS webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
async function updateRssState(
webhookId: string,
timestamp: string,
newGuids: string[],
config: RssWebhookConfig,
logger: ReturnType<typeof import('@sim/logger').createLogger>
) {
const existingGuids = config.lastSeenGuids || []
const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK)
await updateWebhookProviderConfig(
webhookId,
{
lastCheckedTimestamp: timestamp,
lastSeenGuids: allGuids,
},
logger
)
}
async function fetchNewRssItems(
config: RssWebhookConfig,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<{ feed: RssFeed; items: RssItem[] }> {
try {
const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl')
if (!urlValidation.isValid) {
logger.error(`[${requestId}] Invalid RSS feed URL: ${urlValidation.error}`)
throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`)
}
const response = await secureFetchWithPinnedIP(config.feedUrl, urlValidation.resolvedIP!, {
headers: {
'User-Agent': 'Sim/1.0 RSS Poller',
Accept: 'application/rss+xml, application/xml, text/xml, */*',
},
timeout: 30000,
})
if (!response.ok) {
await response.text().catch(() => {})
throw new Error(`Failed to fetch RSS feed: ${response.status} ${response.statusText}`)
}
const xmlContent = await response.text()
const feed = await parser.parseString(xmlContent)
if (!feed.items || !feed.items.length) {
return { feed: feed as RssFeed, items: [] }
}
const lastCheckedTime = config.lastCheckedTimestamp
? new Date(config.lastCheckedTimestamp)
: null
const lastSeenGuids = new Set(config.lastSeenGuids || [])
const newItems = feed.items.filter((item) => {
const itemGuid = item.guid || item.link || ''
if (itemGuid && lastSeenGuids.has(itemGuid)) {
return false
}
if (lastCheckedTime && item.isoDate) {
const itemDate = new Date(item.isoDate)
if (itemDate <= lastCheckedTime) {
return false
}
}
return true
})
newItems.sort((a, b) => {
const dateA = a.isoDate ? new Date(a.isoDate).getTime() : 0
const dateB = b.isoDate ? new Date(b.isoDate).getTime() : 0
return dateB - dateA
})
const limitedItems = newItems.slice(0, 25)
logger.info(
`[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})`
)
return { feed: feed as RssFeed, items: limitedItems as RssItem[] }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage)
throw error
}
}
async function processRssItems(
items: RssItem[],
feed: RssFeed,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<{ processedCount: number; failedCount: number }> {
let processedCount = 0
let failedCount = 0
for (const item of items) {
try {
const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}`
await pollingIdempotency.executeWithIdempotency(
'rss',
`${webhookData.id}:${itemGuid}`,
async () => {
const payload: RssWebhookPayload = {
title: item.title,
link: item.link,
pubDate: item.pubDate,
item: {
title: item.title,
link: item.link,
pubDate: item.pubDate,
guid: item.guid,
description: item.description,
content: item.content,
contentSnippet: item.contentSnippet,
author: item.author || item.creator,
categories: item.categories,
enclosure: item.enclosure,
isoDate: item.isoDate,
},
feed: {
title: feed.title,
link: feed.link,
description: feed.description,
},
timestamp: new Date().toISOString(),
}
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
payload,
requestId
)
if (!result.success) {
logger.error(
`[${requestId}] Failed to process webhook for item ${itemGuid}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
return { itemGuid, processed: true }
}
)
logger.info(
`[${requestId}] Successfully processed item ${item.title || itemGuid} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing item:`, errorMessage)
failedCount++
}
}
return { processedCount, failedCount }
}

View File

@@ -0,0 +1,58 @@
import type { Logger } from '@sim/logger'
/** Summary returned after polling all webhooks for a provider. */
export interface PollSummary {
total: number
successful: number
failed: number
}
/** Context passed to a provider handler when processing one webhook. */
export interface PollWebhookContext {
webhookData: WebhookRecord
workflowData: WorkflowRecord
requestId: string
logger: Logger
}
/** DB row shape for the webhook table. */
export interface WebhookRecord {
id: string
path: string
provider: string | null
blockId: string | null
providerConfig: unknown
credentialSetId: string | null
workflowId: string
[key: string]: unknown
}
/** DB row shape for the workflow table. */
export interface WorkflowRecord {
id: string
userId: string
workspaceId: string
[key: string]: unknown
}
/**
* Strategy interface for provider-specific polling behavior.
* Mirrors `WebhookProviderHandler` from `providers/types.ts`.
*
* Each provider implements `pollWebhook()` — the full inner loop for one webhook:
* validate config, resolve credentials, fetch new items, process each via
* `processPolledWebhookEvent()` (wrapped in `pollingIdempotency`), update state.
*/
export interface PollingProviderHandler {
/** Provider name used in DB queries (e.g. 'gmail', 'rss'). */
readonly provider: string
/** Display label for log messages (e.g. 'Gmail', 'RSS'). */
readonly label: string
/**
* Process a single webhook entry.
* Return 'success' (even if 0 new items) or 'failure'.
*/
pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'>
}

View File

@@ -0,0 +1,242 @@
import { db } from '@sim/db'
import {
account,
credentialSet,
webhook,
workflow,
workflowDeploymentVersion,
} from '@sim/db/schema'
import type { Logger } from '@sim/logger'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing'
import type { WebhookRecord, WorkflowRecord } from '@/lib/webhooks/polling/types'
import {
getOAuthToken,
refreshAccessTokenIfNeeded,
resolveOAuthAccountId,
} from '@/app/api/auth/oauth/utils'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
/** Concurrency limit for parallel webhook processing. Standardized across all providers. */
export const CONCURRENCY = 10
/** Increment the webhook's failure count. Auto-disables after MAX_CONSECUTIVE_FAILURES. */
export async function markWebhookFailed(webhookId: string, logger: Logger): Promise<void> {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
const newFailedCount = result[0]?.failedCount || 0
if (newFailedCount >= MAX_CONSECUTIVE_FAILURES) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
/** Reset the webhook's failure count on successful poll. */
export async function markWebhookSuccess(webhookId: string, logger: Logger): Promise<void> {
try {
await db
.update(webhook)
.set({
failedCount: 0,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
/** Fetch all active webhooks for a provider, joined with their workflow. */
export async function fetchActiveWebhooks(
provider: string
): Promise<{ webhook: WebhookRecord; workflow: WorkflowRecord }[]> {
const rows = await db
.select({ webhook, workflow })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, provider),
eq(webhook.isActive, true),
isNull(webhook.archivedAt),
eq(workflow.isDeployed, true),
isNull(workflow.archivedAt),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
return rows as unknown as { webhook: WebhookRecord; workflow: WorkflowRecord }[]
}
/**
* Run an async function over entries with bounded concurrency.
* Returns aggregate success/failure counts.
*/
export async function runWithConcurrency(
entries: { webhook: WebhookRecord; workflow: WorkflowRecord }[],
processFn: (entry: {
webhook: WebhookRecord
workflow: WorkflowRecord
}) => Promise<'success' | 'failure'>,
logger: Logger
): Promise<{ successCount: number; failureCount: number }> {
const running: Promise<void>[] = []
let successCount = 0
let failureCount = 0
for (const entry of entries) {
const promise: Promise<void> = processFn(entry)
.then((result) => {
if (result === 'success') {
successCount++
} else {
failureCount++
}
})
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
.finally(() => {
const idx = running.indexOf(promise)
if (idx !== -1) running.splice(idx, 1)
})
running.push(promise)
if (running.length >= CONCURRENCY) {
await Promise.race(running)
}
}
await Promise.allSettled(running)
return { successCount, failureCount }
}
/**
* Read-merge-write pattern for updating provider-specific config fields.
* Each provider passes its specific state updates (historyId, lastSeenGuids, etc.).
*/
export async function updateWebhookProviderConfig(
webhookId: string,
configUpdates: Record<string, unknown>,
logger: Logger
): Promise<void> {
try {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, unknown>) || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
...configUpdates,
} as Record<string, unknown>,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to update webhook ${webhookId} config:`, err)
}
}
/**
* Resolve OAuth credentials for a webhook. Shared by Gmail and Outlook.
* Returns the access token or throws on failure.
*/
export async function resolveOAuthCredential(
webhookData: WebhookRecord,
oauthProvider: string,
requestId: string,
logger: Logger
): Promise<string> {
const metadata = webhookData.providerConfig as Record<string, unknown> | null
const credentialId = metadata?.credentialId as string | undefined
const userId = metadata?.userId as string | undefined
const credentialSetId = (webhookData.credentialSetId as string | undefined) ?? undefined
if (!credentialId && !userId) {
throw new Error(`Missing credential info for webhook ${webhookData.id}`)
}
if (credentialSetId) {
const [cs] = await db
.select({ organizationId: credentialSet.organizationId })
.from(credentialSet)
.where(eq(credentialSet.id, credentialSetId))
.limit(1)
if (cs?.organizationId) {
const hasAccess = await isOrganizationOnTeamOrEnterprisePlan(cs.organizationId)
if (!hasAccess) {
logger.error(
`[${requestId}] Polling Group plan restriction: Your current plan does not support Polling Groups. Upgrade to Team or Enterprise to use this feature.`,
{
webhookId: webhookData.id,
credentialSetId,
organizationId: cs.organizationId,
}
)
throw new Error('Polling Group plan restriction')
}
}
}
let accessToken: string | null = null
if (credentialId) {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
throw new Error(
`Failed to resolve OAuth account for credential ${credentialId}, webhook ${webhookData.id}`
)
}
const rows = await db.select().from(account).where(eq(account.id, resolved.accountId)).limit(1)
if (!rows.length) {
throw new Error(`Credential ${credentialId} not found for webhook ${webhookData.id}`)
}
const ownerUserId = rows[0].userId
accessToken = await refreshAccessTokenIfNeeded(resolved.accountId, ownerUserId, requestId)
} else if (userId) {
accessToken = await getOAuthToken(userId, oauthProvider)
}
if (!accessToken) {
throw new Error(`Failed to get ${oauthProvider} access token for webhook ${webhookData.id}`)
}
return accessToken
}

View File

@@ -4,12 +4,13 @@ import { createLogger } from '@sim/logger'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing/core/subscription'
import { tryAdmit } from '@/lib/core/admission/gate'
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { isProd } from '@/lib/core/config/feature-flags'
import { generateId } from '@/lib/core/utils/uuid'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import { DispatchQueueFullError, enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import {
@@ -18,6 +19,7 @@ import {
requiresPendingWebhookVerification,
} from '@/lib/webhooks/pending-verification'
import { getProviderHandler } from '@/lib/webhooks/providers'
import { blockExistsInDeployment } from '@/lib/workflows/persistence/utils'
import { executeWebhookJob } from '@/background/webhook-execution'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { isPollingWebhookProvider } from '@/triggers/constants'
@@ -672,3 +674,215 @@ export async function queueWebhookExecution(
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export interface PolledWebhookEventResult {
success: boolean
error?: string
statusCode?: number
}
interface PolledWebhookRecord {
id: string
path: string
provider: string | null
blockId: string | null
providerConfig: unknown
credentialSetId: string | null
workflowId: string
}
interface PolledWorkflowRecord {
id: string
userId: string
workspaceId: string
}
/**
* Processes a polled webhook event directly, bypassing the HTTP trigger route.
* Used by polling services (Gmail, Outlook, IMAP, RSS) to avoid the self-POST
* anti-pattern where they would otherwise POST back to /api/webhooks/trigger/{path}.
*
* Performs only the steps actually needed for polling providers:
* admission control, preprocessing, block existence check, and queue execution.
*/
export async function processPolledWebhookEvent(
foundWebhook: PolledWebhookRecord,
foundWorkflow: PolledWorkflowRecord,
body: Record<string, unknown> | object,
requestId: string
): Promise<PolledWebhookEventResult> {
if (!foundWebhook.provider) {
return { success: false, error: 'Webhook has no provider', statusCode: 400 }
}
const provider = foundWebhook.provider
const ticket = tryAdmit()
if (!ticket) {
logger.warn(`[${requestId}] Admission gate rejected polled webhook event`)
return { success: false, error: 'Server at capacity', statusCode: 429 }
}
try {
const preprocessResult = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId)
if (preprocessResult.error) {
return { success: false, error: 'Preprocessing failed', statusCode: 500 }
}
if (foundWebhook.blockId) {
const blockExists = await blockExistsInDeployment(foundWorkflow.id, foundWebhook.blockId)
if (!blockExists) {
logger.info(
`[${requestId}] Trigger block ${foundWebhook.blockId} not found in deployment for workflow ${foundWorkflow.id}`
)
return { success: false, error: 'Trigger block not found in deployment', statusCode: 404 }
}
}
const providerConfig = (foundWebhook.providerConfig as Record<string, unknown>) || {}
const credentialId = providerConfig.credentialId as string | undefined
const credentialSetId = foundWebhook.credentialSetId as string | undefined
if (credentialSetId) {
const billingCheck = await verifyCredentialSetBilling(credentialSetId)
if (!billingCheck.valid) {
logger.warn(`[${requestId}] Credential set billing check failed: ${billingCheck.error}`)
return { success: false, error: billingCheck.error, statusCode: 403 }
}
}
const actorUserId = preprocessResult.actorUserId
if (!actorUserId) {
logger.error(`[${requestId}] No actorUserId provided for webhook ${foundWebhook.id}`)
return { success: false, error: 'Unable to resolve billing account', statusCode: 500 }
}
const executionId = preprocessResult.executionId ?? generateId()
const correlation =
preprocessResult.correlation ??
({
executionId,
requestId,
source: 'webhook' as const,
workflowId: foundWorkflow.id,
webhookId: foundWebhook.id,
path: foundWebhook.path,
provider,
triggerType: 'webhook',
} satisfies AsyncExecutionCorrelation)
const payload = {
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
userId: actorUserId,
executionId,
requestId,
correlation,
provider,
body,
headers: { 'content-type': 'application/json' } as Record<string, string>,
path: foundWebhook.path,
blockId: foundWebhook.blockId ?? undefined,
workspaceId: foundWorkflow.workspaceId,
...(credentialId ? { credentialId } : {}),
}
if (isPollingWebhookProvider(payload.provider) && !shouldExecuteInline()) {
const jobId = isBullMQEnabled()
? await enqueueWorkspaceDispatch({
id: executionId,
workspaceId: foundWorkflow.workspaceId,
lane: 'runtime',
queueName: 'webhook-execution',
bullmqJobName: 'webhook-execution',
bullmqPayload: createBullMQJobData(payload, {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
}),
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
: await (await getJobQueue()).enqueue('webhook-execution', payload, {
metadata: {
workflowId: foundWorkflow.id,
workspaceId: foundWorkflow.workspaceId,
userId: actorUserId,
correlation,
},
})
logger.info(
`[${requestId}] Queued polling webhook execution task ${jobId} for ${provider} webhook via job queue`
)
} else {
const jobQueue = await getInlineJobQueue()
const jobId = isBullMQEnabled()
? await enqueueWorkspaceDispatch({
id: executionId,
workspaceId: foundWorkflow.workspaceId,
lane: 'runtime',
queueName: 'webhook-execution',
bullmqJobName: 'webhook-execution',
bullmqPayload: createBullMQJobData(payload, {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
}),
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
: await jobQueue.enqueue('webhook-execution', payload, {
metadata: {
workflowId: foundWorkflow.id,
workspaceId: foundWorkflow.workspaceId,
userId: actorUserId,
correlation,
},
})
logger.info(`[${requestId}] Queued ${provider} webhook execution ${jobId} via inline backend`)
if (!isBullMQEnabled()) {
void (async () => {
try {
await jobQueue.startJob(jobId)
const output = await executeWebhookJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(`[${requestId}] Webhook execution failed`, {
jobId,
error: errorMessage,
})
try {
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${requestId}] Failed to mark job as failed`, {
jobId,
error:
markFailedError instanceof Error
? markFailedError.message
: String(markFailedError),
})
}
}
})()
}
}
return { success: true }
} catch (error: unknown) {
if (error instanceof DispatchQueueFullError) {
logger.warn(`[${requestId}] Dispatch queue full for polled webhook: ${error.message}`)
return { success: false, error: 'Service temporarily at capacity', statusCode: 503 }
}
logger.error(`[${requestId}] Failed to process polled webhook event:`, error)
return { success: false, error: 'Internal server error', statusCode: 500 }
} finally {
ticket.release()
}
}

View File

@@ -1,442 +0,0 @@
import { db } from '@sim/db'
import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or, sql } from 'drizzle-orm'
import Parser from 'rss-parser'
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import {
secureFetchWithPinnedIP,
validateUrlWithDNS,
} from '@/lib/core/security/input-validation.server'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { generateShortId } from '@/lib/core/utils/uuid'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('RssPollingService')
const MAX_GUIDS_TO_TRACK = 100 // Track recent guids to prevent duplicates
interface RssWebhookConfig {
feedUrl: string
lastCheckedTimestamp?: string
lastSeenGuids?: string[]
etag?: string
lastModified?: string
}
interface RssItem {
title?: string
link?: string
pubDate?: string
guid?: string
description?: string
content?: string
contentSnippet?: string
author?: string
creator?: string
categories?: string[]
enclosure?: {
url: string
type?: string
length?: string | number
}
isoDate?: string
[key: string]: any
}
interface RssFeed {
title?: string
link?: string
description?: string
items: RssItem[]
}
export interface RssWebhookPayload {
title?: string
link?: string
pubDate?: string
item: RssItem
feed: {
title?: string
link?: string
description?: string
}
timestamp: string
}
const parser = new Parser({
timeout: 30000,
headers: {
'User-Agent': 'Sim/1.0 RSS Poller',
},
})
async function markWebhookFailed(webhookId: string) {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
const newFailedCount = result[0]?.failedCount || 0
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
async function markWebhookSuccess(webhookId: string) {
try {
await db
.update(webhook)
.set({
failedCount: 0,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
export async function pollRssWebhooks() {
logger.info('Starting RSS webhook polling')
try {
const activeWebhooksResult = await db
.select({ webhook })
.from(webhook)
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(
and(
eq(webhook.provider, 'rss'),
eq(webhook.isActive, true),
eq(workflow.isDeployed, true),
or(
eq(webhook.deploymentVersionId, workflowDeploymentVersion.id),
and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId))
)
)
)
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
if (!activeWebhooks.length) {
logger.info('No active RSS webhooks found')
return { total: 0, successful: 0, failed: 0, details: [] }
}
logger.info(`Found ${activeWebhooks.length} active RSS webhooks`)
const CONCURRENCY = 10
const running: Promise<void>[] = []
let successCount = 0
let failureCount = 0
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
const webhookId = webhookData.id
const requestId = generateShortId()
try {
const config = webhookData.providerConfig as unknown as RssWebhookConfig
if (!config?.feedUrl) {
logger.error(`[${requestId}] Missing feedUrl for webhook ${webhookId}`)
await markWebhookFailed(webhookId)
failureCount++
return
}
const now = new Date()
const { feed, items: newItems } = await fetchNewRssItems(config, requestId)
if (!newItems.length) {
await updateWebhookConfig(webhookId, now.toISOString(), [])
await markWebhookSuccess(webhookId)
logger.info(`[${requestId}] No new items found for webhook ${webhookId}`)
successCount++
return
}
logger.info(`[${requestId}] Found ${newItems.length} new items for webhook ${webhookId}`)
const { processedCount, failedCount: itemFailedCount } = await processRssItems(
newItems,
feed,
webhookData,
requestId
)
const newGuids = newItems
.map((item) => item.guid || item.link || '')
.filter((guid) => guid.length > 0)
await updateWebhookConfig(webhookId, now.toISOString(), newGuids)
if (itemFailedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId)
failureCount++
logger.warn(
`[${requestId}] All ${itemFailedCount} items failed to process for webhook ${webhookId}`
)
} else {
await markWebhookSuccess(webhookId)
successCount++
logger.info(
`[${requestId}] Successfully processed ${processedCount} items for webhook ${webhookId}${itemFailedCount > 0 ? ` (${itemFailedCount} failed)` : ''}`
)
}
} catch (error) {
logger.error(`[${requestId}] Error processing RSS webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId)
failureCount++
}
}
for (const webhookData of activeWebhooks) {
const promise = enqueue(webhookData)
.then(() => {})
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
running.push(promise)
if (running.length >= CONCURRENCY) {
const completedIdx = await Promise.race(running.map((p, i) => p.then(() => i)))
running.splice(completedIdx, 1)
}
}
await Promise.allSettled(running)
const summary = {
total: activeWebhooks.length,
successful: successCount,
failed: failureCount,
details: [],
}
logger.info('RSS polling completed', {
total: summary.total,
successful: summary.successful,
failed: summary.failed,
})
return summary
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error('Error in RSS polling service:', errorMessage)
throw error
}
}
async function fetchNewRssItems(
config: RssWebhookConfig,
requestId: string
): Promise<{ feed: RssFeed; items: RssItem[] }> {
try {
const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl')
if (!urlValidation.isValid) {
logger.error(`[${requestId}] Invalid RSS feed URL: ${urlValidation.error}`)
throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`)
}
const response = await secureFetchWithPinnedIP(config.feedUrl, urlValidation.resolvedIP!, {
headers: {
'User-Agent': 'Sim/1.0 RSS Poller',
Accept: 'application/rss+xml, application/xml, text/xml, */*',
},
timeout: 30000,
})
if (!response.ok) {
await response.text().catch(() => {})
throw new Error(`Failed to fetch RSS feed: ${response.status} ${response.statusText}`)
}
const xmlContent = await response.text()
const feed = await parser.parseString(xmlContent)
if (!feed.items || !feed.items.length) {
return { feed: feed as RssFeed, items: [] }
}
const lastCheckedTime = config.lastCheckedTimestamp
? new Date(config.lastCheckedTimestamp)
: null
const lastSeenGuids = new Set(config.lastSeenGuids || [])
const newItems = feed.items.filter((item) => {
const itemGuid = item.guid || item.link || ''
if (itemGuid && lastSeenGuids.has(itemGuid)) {
return false
}
if (lastCheckedTime && item.isoDate) {
const itemDate = new Date(item.isoDate)
if (itemDate <= lastCheckedTime) {
return false
}
}
return true
})
newItems.sort((a, b) => {
const dateA = a.isoDate ? new Date(a.isoDate).getTime() : 0
const dateB = b.isoDate ? new Date(b.isoDate).getTime() : 0
return dateB - dateA
})
const limitedItems = newItems.slice(0, 25)
logger.info(
`[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})`
)
return { feed: feed as RssFeed, items: limitedItems as RssItem[] }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage)
throw error
}
}
async function processRssItems(
items: RssItem[],
feed: RssFeed,
webhookData: any,
requestId: string
): Promise<{ processedCount: number; failedCount: number }> {
let processedCount = 0
let failedCount = 0
for (const item of items) {
try {
const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}`
await pollingIdempotency.executeWithIdempotency(
'rss',
`${webhookData.id}:${itemGuid}`,
async () => {
const payload: RssWebhookPayload = {
title: item.title,
link: item.link,
pubDate: item.pubDate,
item: {
title: item.title,
link: item.link,
pubDate: item.pubDate,
guid: item.guid,
description: item.description,
content: item.content,
contentSnippet: item.contentSnippet,
author: item.author || item.creator,
categories: item.categories,
enclosure: item.enclosure,
isoDate: item.isoDate,
},
feed: {
title: feed.title,
link: feed.link,
description: feed.description,
},
timestamp: new Date().toISOString(),
}
const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'Sim/1.0',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
const errorText = await response.text()
logger.error(
`[${requestId}] Failed to trigger webhook for item ${itemGuid}:`,
response.status,
errorText
)
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
}
return {
itemGuid,
webhookStatus: response.status,
processed: true,
}
}
)
logger.info(
`[${requestId}] Successfully processed item ${item.title || itemGuid} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing item:`, errorMessage)
failedCount++
}
}
return { processedCount, failedCount }
}
async function updateWebhookConfig(webhookId: string, timestamp: string, newGuids: string[]) {
try {
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
const existingGuids = existingConfig.lastSeenGuids || []
const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK)
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
lastSeenGuids: allGuids,
} as any,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to update webhook ${webhookId} config:`, err)
}
}