mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(polling): fixed gmail and outlook polling to respect disabled status (#2185)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { db } from '@sim/db'
|
||||
import { account, webhook } from '@sim/db/schema'
|
||||
import { account, webhook, workflow } from '@sim/db/schema'
|
||||
import { and, eq, sql } from 'drizzle-orm'
|
||||
import { nanoid } from 'nanoid'
|
||||
import { pollingIdempotency } from '@/lib/core/idempotency/service'
|
||||
@@ -21,7 +21,6 @@ interface GmailWebhookConfig {
|
||||
maxEmailsPerPoll?: number
|
||||
lastCheckedTimestamp?: string
|
||||
historyId?: string
|
||||
pollingInterval?: number
|
||||
includeAttachments?: boolean
|
||||
includeRawEmail?: boolean
|
||||
}
|
||||
@@ -108,11 +107,19 @@ export async function pollGmailWebhooks() {
|
||||
logger.info('Starting Gmail webhook polling')
|
||||
|
||||
try {
|
||||
// Get all active Gmail webhooks
|
||||
const activeWebhooks = await db
|
||||
.select()
|
||||
const activeWebhooksResult = await db
|
||||
.select({ webhook })
|
||||
.from(webhook)
|
||||
.where(and(eq(webhook.provider, 'gmail'), eq(webhook.isActive, true)))
|
||||
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
|
||||
.where(
|
||||
and(
|
||||
eq(webhook.provider, 'gmail'),
|
||||
eq(webhook.isActive, true),
|
||||
eq(workflow.isDeployed, true)
|
||||
)
|
||||
)
|
||||
|
||||
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
|
||||
|
||||
if (!activeWebhooks.length) {
|
||||
logger.info('No active Gmail webhooks found')
|
||||
@@ -134,7 +141,6 @@ export async function pollGmailWebhooks() {
|
||||
const requestId = nanoid()
|
||||
|
||||
try {
|
||||
// Extract metadata
|
||||
const metadata = webhookData.providerConfig as any
|
||||
const credentialId: string | undefined = metadata?.credentialId
|
||||
const userId: string | undefined = metadata?.userId
|
||||
@@ -146,7 +152,6 @@ export async function pollGmailWebhooks() {
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve owner and token
|
||||
let accessToken: string | null = null
|
||||
if (credentialId) {
|
||||
const rows = await db.select().from(account).where(eq(account.id, credentialId)).limit(1)
|
||||
@@ -161,7 +166,6 @@ export async function pollGmailWebhooks() {
|
||||
const ownerUserId = rows[0].userId
|
||||
accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId)
|
||||
} else if (userId) {
|
||||
// Backward-compat fallback to workflow owner token
|
||||
accessToken = await getOAuthToken(userId, 'google-email')
|
||||
}
|
||||
|
||||
@@ -174,18 +178,15 @@ export async function pollGmailWebhooks() {
|
||||
return
|
||||
}
|
||||
|
||||
// Get webhook configuration
|
||||
const config = webhookData.providerConfig as unknown as GmailWebhookConfig
|
||||
|
||||
const now = new Date()
|
||||
|
||||
// Fetch new emails
|
||||
const fetchResult = await fetchNewEmails(accessToken, config, requestId)
|
||||
|
||||
const { emails, latestHistoryId } = fetchResult
|
||||
|
||||
if (!emails || !emails.length) {
|
||||
// Update last checked timestamp
|
||||
await updateWebhookLastChecked(
|
||||
webhookId,
|
||||
now.toISOString(),
|
||||
@@ -201,10 +202,8 @@ export async function pollGmailWebhooks() {
|
||||
|
||||
logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`)
|
||||
|
||||
// Process all emails (process each email as a separate workflow trigger)
|
||||
const emailsToProcess = emails
|
||||
|
||||
// Process emails
|
||||
const { processedCount, failedCount } = await processEmails(
|
||||
emailsToProcess,
|
||||
webhookData,
|
||||
@@ -213,19 +212,19 @@ export async function pollGmailWebhooks() {
|
||||
requestId
|
||||
)
|
||||
|
||||
// Update webhook with latest history ID and timestamp
|
||||
await updateWebhookData(webhookId, now.toISOString(), latestHistoryId || config.historyId)
|
||||
await updateWebhookLastChecked(
|
||||
webhookId,
|
||||
now.toISOString(),
|
||||
latestHistoryId || config.historyId
|
||||
)
|
||||
|
||||
// If all emails failed, mark webhook as failed. Otherwise mark as success.
|
||||
if (failedCount > 0 && processedCount === 0) {
|
||||
// All emails failed to process - mark webhook as failed
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
logger.warn(
|
||||
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
|
||||
)
|
||||
} else {
|
||||
// At least some emails processed successfully
|
||||
await markWebhookSuccess(webhookId)
|
||||
successCount++
|
||||
logger.info(
|
||||
@@ -233,7 +232,6 @@ export async function pollGmailWebhooks() {
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error)
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
@@ -242,9 +240,7 @@ export async function pollGmailWebhooks() {
|
||||
|
||||
for (const webhookData of activeWebhooks) {
|
||||
const promise = enqueue(webhookData)
|
||||
.then(() => {
|
||||
// Result processed, memory released
|
||||
})
|
||||
.then(() => {})
|
||||
.catch((err) => {
|
||||
logger.error('Unexpected error in webhook processing:', err)
|
||||
failureCount++
|
||||
@@ -258,14 +254,13 @@ export async function pollGmailWebhooks() {
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for remaining webhooks to complete
|
||||
await Promise.allSettled(running)
|
||||
|
||||
const summary = {
|
||||
total: activeWebhooks.length,
|
||||
successful: successCount,
|
||||
failed: failureCount,
|
||||
details: [], // Don't store details to save memory
|
||||
details: [],
|
||||
}
|
||||
|
||||
logger.info('Gmail polling completed', {
|
||||
@@ -284,13 +279,11 @@ export async function pollGmailWebhooks() {
|
||||
|
||||
async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) {
|
||||
try {
|
||||
// Determine whether to use history API or search
|
||||
const useHistoryApi = !!config.historyId
|
||||
let emails = []
|
||||
let latestHistoryId = config.historyId
|
||||
|
||||
if (useHistoryApi) {
|
||||
// Use history API to get changes since last check
|
||||
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
|
||||
|
||||
const historyResponse = await fetch(historyUrl, {
|
||||
@@ -307,7 +300,6 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r
|
||||
error: errorData,
|
||||
})
|
||||
|
||||
// Fall back to search if history API fails
|
||||
logger.info(`[${requestId}] Falling back to search API after history API failure`)
|
||||
const searchResult = await searchEmails(accessToken, config, requestId)
|
||||
return {
|
||||
@@ -322,12 +314,10 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r
|
||||
return { emails: [], latestHistoryId }
|
||||
}
|
||||
|
||||
// Update the latest history ID
|
||||
if (historyData.historyId) {
|
||||
latestHistoryId = historyData.historyId
|
||||
}
|
||||
|
||||
// Extract message IDs from history
|
||||
const messageIds = new Set<string>()
|
||||
|
||||
for (const history of historyData.history) {
|
||||
@@ -342,29 +332,28 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r
|
||||
return { emails: [], latestHistoryId }
|
||||
}
|
||||
|
||||
// Sort IDs by recency (reverse order)
|
||||
const sortedIds = [...messageIds].sort().reverse()
|
||||
|
||||
// Process all emails but respect the configured limit
|
||||
const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25)
|
||||
logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`)
|
||||
|
||||
// Fetch full email details for each message
|
||||
const emailPromises = idsToFetch.map(async (messageId) => {
|
||||
return getEmailDetails(accessToken, messageId)
|
||||
})
|
||||
|
||||
const emailResults = await Promise.allSettled(emailPromises)
|
||||
const rejected = emailResults.filter((r) => r.status === 'rejected')
|
||||
if (rejected.length > 0) {
|
||||
logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`)
|
||||
}
|
||||
emails = emailResults
|
||||
.filter(
|
||||
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
|
||||
)
|
||||
.map((result) => result.value)
|
||||
|
||||
// Filter emails by labels if needed
|
||||
emails = filterEmailsByLabels(emails, config)
|
||||
} else {
|
||||
// Use search if no history ID is available
|
||||
const searchResult = await searchEmails(accessToken, config, requestId)
|
||||
return searchResult
|
||||
}
|
||||
@@ -373,7 +362,7 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error fetching new emails:`, errorMessage)
|
||||
return { emails: [], latestHistoryId: config.historyId }
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -425,59 +414,40 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
|
||||
const baseQuery = buildGmailSearchQuery(config)
|
||||
logger.debug(`[${requestId}] Gmail search query: ${baseQuery}`)
|
||||
|
||||
// Improved time-based filtering with dynamic buffer
|
||||
let timeConstraint = ''
|
||||
|
||||
if (config.lastCheckedTimestamp) {
|
||||
// Parse the last check time
|
||||
const lastCheckedTime = new Date(config.lastCheckedTimestamp)
|
||||
const now = new Date()
|
||||
|
||||
// Calculate minutes since last check
|
||||
const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000)
|
||||
|
||||
// If last check was recent, use precise time-based query
|
||||
if (minutesSinceLastCheck < 60) {
|
||||
// Less than an hour ago
|
||||
// Calculate buffer in seconds - the greater of:
|
||||
// 1. Twice the configured polling interval (or 2 minutes if not set)
|
||||
// 2. At least 3 minutes (180 seconds)
|
||||
const bufferSeconds = Math.max((config.pollingInterval || 2) * 60 * 2, 180)
|
||||
const bufferSeconds = Math.max(1 * 60 * 2, 180)
|
||||
|
||||
// Calculate the cutoff time with buffer
|
||||
const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000)
|
||||
|
||||
// Format for Gmail's search syntax (seconds since epoch)
|
||||
const timestamp = Math.floor(cutoffTime.getTime() / 1000)
|
||||
|
||||
timeConstraint = ` after:${timestamp}`
|
||||
logger.debug(`[${requestId}] Using timestamp-based query with ${bufferSeconds}s buffer`)
|
||||
}
|
||||
// If last check was a while ago, use Gmail's relative time queries
|
||||
else if (minutesSinceLastCheck < 24 * 60) {
|
||||
// Less than a day
|
||||
// Use newer_than:Xh syntax for better reliability with longer intervals
|
||||
} else if (minutesSinceLastCheck < 24 * 60) {
|
||||
const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 // Round up and add 1 hour buffer
|
||||
timeConstraint = ` newer_than:${hours}h`
|
||||
logger.debug(`[${requestId}] Using hour-based query: newer_than:${hours}h`)
|
||||
} else {
|
||||
// For very old last checks, limit to a reasonable time period (7 days max)
|
||||
const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1
|
||||
timeConstraint = ` newer_than:${days}d`
|
||||
logger.debug(`[${requestId}] Using day-based query: newer_than:${days}d`)
|
||||
}
|
||||
} else {
|
||||
// If there's no last checked timestamp, default to recent emails (last 24h)
|
||||
timeConstraint = ' newer_than:1d'
|
||||
logger.debug(`[${requestId}] No last check time, using default: newer_than:1d`)
|
||||
}
|
||||
|
||||
// Combine base query and time constraints
|
||||
const query = `${baseQuery}${timeConstraint}`
|
||||
|
||||
logger.info(`[${requestId}] Searching for emails with query: ${query}`)
|
||||
|
||||
// Search for emails with lower default
|
||||
const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}`
|
||||
|
||||
const searchResponse = await fetch(searchUrl, {
|
||||
@@ -494,7 +464,9 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
|
||||
query: query,
|
||||
error: errorData,
|
||||
})
|
||||
return { emails: [], latestHistoryId: config.historyId }
|
||||
throw new Error(
|
||||
`Gmail API error: ${searchResponse.status} ${searchResponse.statusText} - ${JSON.stringify(errorData)}`
|
||||
)
|
||||
}
|
||||
|
||||
const searchData = await searchResponse.json()
|
||||
@@ -504,7 +476,6 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
|
||||
return { emails: [], latestHistoryId: config.historyId }
|
||||
}
|
||||
|
||||
// Process emails within the limit
|
||||
const idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25)
|
||||
let latestHistoryId = config.historyId
|
||||
|
||||
@@ -512,19 +483,21 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
|
||||
`[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})`
|
||||
)
|
||||
|
||||
// Fetch full email details for each message
|
||||
const emailPromises = idsToFetch.map(async (message: { id: string }) => {
|
||||
return getEmailDetails(accessToken, message.id)
|
||||
})
|
||||
|
||||
const emailResults = await Promise.allSettled(emailPromises)
|
||||
const rejected = emailResults.filter((r) => r.status === 'rejected')
|
||||
if (rejected.length > 0) {
|
||||
logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`)
|
||||
}
|
||||
const emails = emailResults
|
||||
.filter(
|
||||
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
|
||||
)
|
||||
.map((result) => result.value)
|
||||
|
||||
// Get the latest history ID from the first email (most recent)
|
||||
if (emails.length > 0 && emails[0].historyId) {
|
||||
latestHistoryId = emails[0].historyId
|
||||
logger.debug(`[${requestId}] Updated historyId to ${latestHistoryId}`)
|
||||
@@ -534,7 +507,7 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error searching emails:`, errorMessage)
|
||||
return { emails: [], latestHistoryId: config.historyId }
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -586,12 +559,10 @@ async function processEmails(
|
||||
|
||||
for (const email of emails) {
|
||||
try {
|
||||
const result = await pollingIdempotency.executeWithIdempotency(
|
||||
await pollingIdempotency.executeWithIdempotency(
|
||||
'gmail',
|
||||
`${webhookData.id}:${email.id}`,
|
||||
async () => {
|
||||
// Extract useful information from email to create a simplified payload
|
||||
// First, extract headers into a map for easy access
|
||||
const headers: Record<string, string> = {}
|
||||
if (email.payload?.headers) {
|
||||
for (const header of email.payload.headers) {
|
||||
@@ -599,22 +570,18 @@ async function processEmails(
|
||||
}
|
||||
}
|
||||
|
||||
// Extract and decode email body content
|
||||
let textContent = ''
|
||||
let htmlContent = ''
|
||||
|
||||
// Function to extract content from parts recursively
|
||||
const extractContent = (part: any) => {
|
||||
if (!part) return
|
||||
|
||||
// Extract current part content if it exists
|
||||
if (part.mimeType === 'text/plain' && part.body?.data) {
|
||||
textContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
|
||||
} else if (part.mimeType === 'text/html' && part.body?.data) {
|
||||
htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
|
||||
}
|
||||
|
||||
// Process nested parts
|
||||
if (part.parts && Array.isArray(part.parts)) {
|
||||
for (const subPart of part.parts) {
|
||||
extractContent(subPart)
|
||||
@@ -622,12 +589,10 @@ async function processEmails(
|
||||
}
|
||||
}
|
||||
|
||||
// Extract content from the email payload
|
||||
if (email.payload) {
|
||||
extractContent(email.payload)
|
||||
}
|
||||
|
||||
// Parse date into standard format
|
||||
let date: string | null = null
|
||||
if (headers.date) {
|
||||
try {
|
||||
@@ -636,11 +601,9 @@ async function processEmails(
|
||||
// Keep date as null if parsing fails
|
||||
}
|
||||
} else if (email.internalDate) {
|
||||
// Use internalDate as fallback (convert from timestamp to ISO string)
|
||||
date = new Date(Number.parseInt(email.internalDate)).toISOString()
|
||||
}
|
||||
|
||||
// Download attachments if requested (raw Buffers - will be uploaded during execution)
|
||||
let attachments: GmailAttachment[] = []
|
||||
const hasAttachments = email.payload
|
||||
? extractAttachmentInfo(email.payload).length > 0
|
||||
@@ -655,11 +618,9 @@ async function processEmails(
|
||||
`[${requestId}] Error downloading attachments for email ${email.id}:`,
|
||||
error
|
||||
)
|
||||
// Continue without attachments rather than failing the entire request
|
||||
}
|
||||
}
|
||||
|
||||
// Create simplified email object
|
||||
const simplifiedEmail: SimplifiedEmail = {
|
||||
id: email.id,
|
||||
threadId: email.threadId,
|
||||
@@ -675,7 +636,6 @@ async function processEmails(
|
||||
attachments,
|
||||
}
|
||||
|
||||
// Prepare webhook payload with simplified email and optionally raw email
|
||||
const payload: GmailWebhookPayload = {
|
||||
email: simplifiedEmail,
|
||||
timestamp: new Date().toISOString(),
|
||||
@@ -686,7 +646,6 @@ async function processEmails(
|
||||
`[${requestId}] Sending ${config.includeRawEmail ? 'simplified + raw' : 'simplified'} email payload for ${email.id}`
|
||||
)
|
||||
|
||||
// Trigger the webhook
|
||||
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
|
||||
|
||||
const response = await fetch(webhookUrl, {
|
||||
@@ -709,7 +668,6 @@ async function processEmails(
|
||||
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
|
||||
}
|
||||
|
||||
// Mark email as read if configured
|
||||
if (config.markAsRead) {
|
||||
await markEmailAsRead(accessToken, email.id)
|
||||
}
|
||||
@@ -730,7 +688,6 @@ async function processEmails(
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage)
|
||||
failedCount++
|
||||
// Continue processing other emails even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
@@ -778,20 +735,3 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string, hi
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
}
|
||||
|
||||
async function updateWebhookData(webhookId: string, timestamp: string, historyId?: string) {
|
||||
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
|
||||
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
|
||||
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
providerConfig: {
|
||||
...existingConfig,
|
||||
lastCheckedTimestamp: timestamp,
|
||||
...(historyId ? { historyId } : {}),
|
||||
} as any,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { db } from '@sim/db'
|
||||
import { account, webhook } from '@sim/db/schema'
|
||||
import { account, webhook, workflow } from '@sim/db/schema'
|
||||
import { and, eq, sql } from 'drizzle-orm'
|
||||
import { htmlToText } from 'html-to-text'
|
||||
import { nanoid } from 'nanoid'
|
||||
@@ -66,7 +66,6 @@ interface OutlookWebhookConfig {
|
||||
markAsRead?: boolean
|
||||
maxEmailsPerPoll?: number
|
||||
lastCheckedTimestamp?: string
|
||||
pollingInterval?: number
|
||||
includeAttachments?: boolean
|
||||
includeRawEmail?: boolean
|
||||
}
|
||||
@@ -126,15 +125,14 @@ export interface SimplifiedOutlookEmail {
|
||||
attachments: OutlookAttachment[]
|
||||
isRead: boolean
|
||||
folderId: string
|
||||
// Thread support fields
|
||||
messageId: string // Same as id, but explicit for threading
|
||||
threadId: string // Same as conversationId, but explicit for threading
|
||||
messageId: string
|
||||
threadId: string
|
||||
}
|
||||
|
||||
export interface OutlookWebhookPayload {
|
||||
email: SimplifiedOutlookEmail
|
||||
timestamp: string
|
||||
rawEmail?: OutlookEmail // Only included when includeRawEmail is true
|
||||
rawEmail?: OutlookEmail
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -159,11 +157,19 @@ export async function pollOutlookWebhooks() {
|
||||
logger.info('Starting Outlook webhook polling')
|
||||
|
||||
try {
|
||||
// Get all active Outlook webhooks
|
||||
const activeWebhooks = await db
|
||||
.select()
|
||||
const activeWebhooksResult = await db
|
||||
.select({ webhook })
|
||||
.from(webhook)
|
||||
.where(and(eq(webhook.provider, 'outlook'), eq(webhook.isActive, true)))
|
||||
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
|
||||
.where(
|
||||
and(
|
||||
eq(webhook.provider, 'outlook'),
|
||||
eq(webhook.isActive, true),
|
||||
eq(workflow.isDeployed, true)
|
||||
)
|
||||
)
|
||||
|
||||
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
|
||||
|
||||
if (!activeWebhooks.length) {
|
||||
logger.info('No active Outlook webhooks found')
|
||||
@@ -172,7 +178,6 @@ export async function pollOutlookWebhooks() {
|
||||
|
||||
logger.info(`Found ${activeWebhooks.length} active Outlook webhooks`)
|
||||
|
||||
// Limit concurrency to avoid exhausting connections
|
||||
const CONCURRENCY = 10
|
||||
const running: Promise<void>[] = []
|
||||
let successCount = 0
|
||||
@@ -185,7 +190,6 @@ export async function pollOutlookWebhooks() {
|
||||
try {
|
||||
logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`)
|
||||
|
||||
// Extract metadata
|
||||
const metadata = webhookData.providerConfig as any
|
||||
const credentialId: string | undefined = metadata?.credentialId
|
||||
const userId: string | undefined = metadata?.userId
|
||||
@@ -197,7 +201,6 @@ export async function pollOutlookWebhooks() {
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve access token
|
||||
let accessToken: string | null = null
|
||||
if (credentialId) {
|
||||
const rows = await db.select().from(account).where(eq(account.id, credentialId)).limit(1)
|
||||
@@ -212,7 +215,6 @@ export async function pollOutlookWebhooks() {
|
||||
const ownerUserId = rows[0].userId
|
||||
accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId)
|
||||
} else if (userId) {
|
||||
// Backward-compat fallback to workflow owner token
|
||||
accessToken = await getOAuthToken(userId, 'outlook')
|
||||
}
|
||||
|
||||
@@ -225,17 +227,14 @@ export async function pollOutlookWebhooks() {
|
||||
return
|
||||
}
|
||||
|
||||
// Get webhook configuration
|
||||
const config = webhookData.providerConfig as unknown as OutlookWebhookConfig
|
||||
|
||||
const now = new Date()
|
||||
|
||||
// Fetch new emails
|
||||
const fetchResult = await fetchNewOutlookEmails(accessToken, config, requestId)
|
||||
const { emails } = fetchResult
|
||||
|
||||
if (!emails || !emails.length) {
|
||||
// Update last checked timestamp
|
||||
await updateWebhookLastChecked(webhookId, now.toISOString())
|
||||
await markWebhookSuccess(webhookId)
|
||||
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
|
||||
@@ -247,7 +246,6 @@ export async function pollOutlookWebhooks() {
|
||||
|
||||
logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`)
|
||||
|
||||
// Process emails
|
||||
const { processedCount, failedCount } = await processOutlookEmails(
|
||||
emails,
|
||||
webhookData,
|
||||
@@ -256,19 +254,15 @@ export async function pollOutlookWebhooks() {
|
||||
requestId
|
||||
)
|
||||
|
||||
// Update webhook with latest timestamp
|
||||
await updateWebhookLastChecked(webhookId, now.toISOString())
|
||||
|
||||
// If all emails failed, mark webhook as failed. Otherwise mark as success.
|
||||
if (failedCount > 0 && processedCount === 0) {
|
||||
// All emails failed to process - mark webhook as failed
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
logger.warn(
|
||||
`[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}`
|
||||
)
|
||||
} else {
|
||||
// At least some emails processed successfully
|
||||
await markWebhookSuccess(webhookId)
|
||||
successCount++
|
||||
logger.info(
|
||||
@@ -276,7 +270,6 @@ export async function pollOutlookWebhooks() {
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error)
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
@@ -285,9 +278,7 @@ export async function pollOutlookWebhooks() {
|
||||
|
||||
for (const webhookData of activeWebhooks) {
|
||||
const promise = enqueue(webhookData)
|
||||
.then(() => {
|
||||
// Result processed, memory released
|
||||
})
|
||||
.then(() => {})
|
||||
.catch((err) => {
|
||||
logger.error('Unexpected error in webhook processing:', err)
|
||||
failureCount++
|
||||
@@ -301,7 +292,6 @@ export async function pollOutlookWebhooks() {
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for remaining webhooks to complete
|
||||
await Promise.allSettled(running)
|
||||
|
||||
logger.info(`Outlook polling completed: ${successCount} successful, ${failureCount} failed`)
|
||||
@@ -310,7 +300,7 @@ export async function pollOutlookWebhooks() {
|
||||
total: activeWebhooks.length,
|
||||
successful: successCount,
|
||||
failed: failureCount,
|
||||
details: [], // Don't store details to save memory
|
||||
details: [],
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error during Outlook webhook polling:', error)
|
||||
@@ -324,27 +314,21 @@ async function fetchNewOutlookEmails(
|
||||
requestId: string
|
||||
) {
|
||||
try {
|
||||
// Build the Microsoft Graph API URL
|
||||
const apiUrl = 'https://graph.microsoft.com/v1.0/me/messages'
|
||||
const params = new URLSearchParams()
|
||||
|
||||
// Add select parameters to get the fields we need
|
||||
params.append(
|
||||
'$select',
|
||||
'id,conversationId,subject,bodyPreview,body,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,hasAttachments,isRead,parentFolderId'
|
||||
)
|
||||
|
||||
// Add ordering (newest first)
|
||||
params.append('$orderby', 'receivedDateTime desc')
|
||||
|
||||
// Limit results
|
||||
params.append('$top', (config.maxEmailsPerPoll || 25).toString())
|
||||
|
||||
// Add time filter if we have a last checked timestamp
|
||||
if (config.lastCheckedTimestamp) {
|
||||
const lastChecked = new Date(config.lastCheckedTimestamp)
|
||||
// Add a small buffer to avoid missing emails due to clock differences
|
||||
const bufferTime = new Date(lastChecked.getTime() - 60000) // 1 minute buffer
|
||||
const bufferTime = new Date(lastChecked.getTime() - 60000)
|
||||
params.append('$filter', `receivedDateTime gt ${bufferTime.toISOString()}`)
|
||||
}
|
||||
|
||||
@@ -366,13 +350,14 @@ async function fetchNewOutlookEmails(
|
||||
statusText: response.statusText,
|
||||
error: errorData,
|
||||
})
|
||||
return { emails: [] }
|
||||
throw new Error(
|
||||
`Microsoft Graph API error: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}`
|
||||
)
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const emails = data.value || []
|
||||
|
||||
// Filter by folder if configured
|
||||
const filteredEmails = filterEmailsByFolder(emails, config)
|
||||
|
||||
logger.info(
|
||||
@@ -383,7 +368,7 @@ async function fetchNewOutlookEmails(
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error fetching new Outlook emails:`, errorMessage)
|
||||
return { emails: [] }
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -401,9 +386,7 @@ function filterEmailsByFolder(
|
||||
emailFolderId.toLowerCase().includes(configFolder.toLowerCase())
|
||||
)
|
||||
|
||||
return config.folderFilterBehavior === 'INCLUDE'
|
||||
? hasMatchingFolder // Include emails from matching folders
|
||||
: !hasMatchingFolder // Exclude emails from matching folders
|
||||
return config.folderFilterBehavior === 'INCLUDE' ? hasMatchingFolder : !hasMatchingFolder
|
||||
})
|
||||
}
|
||||
|
||||
@@ -419,7 +402,7 @@ async function processOutlookEmails(
|
||||
|
||||
for (const email of emails) {
|
||||
try {
|
||||
const result = await pollingIdempotency.executeWithIdempotency(
|
||||
await pollingIdempotency.executeWithIdempotency(
|
||||
'outlook',
|
||||
`${webhookData.id}:${email.id}`,
|
||||
async () => {
|
||||
@@ -435,7 +418,6 @@ async function processOutlookEmails(
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to simplified format
|
||||
const simplifiedEmail: SimplifiedOutlookEmail = {
|
||||
id: email.id,
|
||||
conversationId: email.conversationId,
|
||||
@@ -460,18 +442,15 @@ async function processOutlookEmails(
|
||||
attachments,
|
||||
isRead: email.isRead,
|
||||
folderId: email.parentFolderId,
|
||||
// Thread support fields
|
||||
messageId: email.id,
|
||||
threadId: email.conversationId,
|
||||
}
|
||||
|
||||
// Create webhook payload
|
||||
const payload: OutlookWebhookPayload = {
|
||||
email: simplifiedEmail,
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
|
||||
// Include raw email if configured
|
||||
if (config.includeRawEmail) {
|
||||
payload.rawEmail = email
|
||||
}
|
||||
@@ -480,7 +459,6 @@ async function processOutlookEmails(
|
||||
`[${requestId}] Processing email: ${email.subject} from ${email.from?.emailAddress?.address}`
|
||||
)
|
||||
|
||||
// Trigger the webhook
|
||||
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
|
||||
|
||||
const response = await fetch(webhookUrl, {
|
||||
@@ -503,7 +481,6 @@ async function processOutlookEmails(
|
||||
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
|
||||
}
|
||||
|
||||
// Mark email as read if configured
|
||||
if (config.markAsRead) {
|
||||
await markOutlookEmailAsRead(accessToken, email.id)
|
||||
}
|
||||
@@ -537,7 +514,6 @@ async function downloadOutlookAttachments(
|
||||
const attachments: OutlookAttachment[] = []
|
||||
|
||||
try {
|
||||
// Fetch attachments list from Microsoft Graph API
|
||||
const response = await fetch(
|
||||
`https://graph.microsoft.com/v1.0/me/messages/${messageId}/attachments`,
|
||||
{
|
||||
@@ -558,11 +534,9 @@ async function downloadOutlookAttachments(
|
||||
|
||||
for (const attachment of attachmentsList) {
|
||||
try {
|
||||
// Microsoft Graph returns attachment data directly in the list response for file attachments
|
||||
if (attachment['@odata.type'] === '#microsoft.graph.fileAttachment') {
|
||||
const contentBytes = attachment.contentBytes
|
||||
if (contentBytes) {
|
||||
// contentBytes is base64 encoded
|
||||
const buffer = Buffer.from(contentBytes, 'base64')
|
||||
attachments.push({
|
||||
name: attachment.name,
|
||||
@@ -577,7 +551,6 @@ async function downloadOutlookAttachments(
|
||||
`[${requestId}] Error processing attachment ${attachment.id} for message ${messageId}:`,
|
||||
error
|
||||
)
|
||||
// Continue with other attachments
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,7 +591,6 @@ async function markOutlookEmailAsRead(accessToken: string, messageId: string) {
|
||||
|
||||
async function updateWebhookLastChecked(webhookId: string, timestamp: string) {
|
||||
try {
|
||||
// Get current config first
|
||||
const currentWebhook = await db
|
||||
.select({ providerConfig: webhook.providerConfig })
|
||||
.from(webhook)
|
||||
@@ -632,7 +604,7 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string) {
|
||||
|
||||
const currentConfig = (currentWebhook[0].providerConfig as any) || {}
|
||||
const updatedConfig = {
|
||||
...currentConfig, // Preserve ALL existing config including userId
|
||||
...currentConfig,
|
||||
lastCheckedTimestamp: timestamp,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user