diff --git a/apps/sim/lib/webhooks/gmail-polling-service.ts b/apps/sim/lib/webhooks/gmail-polling-service.ts index efbe26115..ff1872ce7 100644 --- a/apps/sim/lib/webhooks/gmail-polling-service.ts +++ b/apps/sim/lib/webhooks/gmail-polling-service.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { account, webhook } from '@sim/db/schema' +import { account, webhook, workflow } from '@sim/db/schema' import { and, eq, sql } from 'drizzle-orm' import { nanoid } from 'nanoid' import { pollingIdempotency } from '@/lib/core/idempotency/service' @@ -21,7 +21,6 @@ interface GmailWebhookConfig { maxEmailsPerPoll?: number lastCheckedTimestamp?: string historyId?: string - pollingInterval?: number includeAttachments?: boolean includeRawEmail?: boolean } @@ -108,11 +107,19 @@ export async function pollGmailWebhooks() { logger.info('Starting Gmail webhook polling') try { - // Get all active Gmail webhooks - const activeWebhooks = await db - .select() + const activeWebhooksResult = await db + .select({ webhook }) .from(webhook) - .where(and(eq(webhook.provider, 'gmail'), eq(webhook.isActive, true))) + .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) + .where( + and( + eq(webhook.provider, 'gmail'), + eq(webhook.isActive, true), + eq(workflow.isDeployed, true) + ) + ) + + const activeWebhooks = activeWebhooksResult.map((r) => r.webhook) if (!activeWebhooks.length) { logger.info('No active Gmail webhooks found') @@ -134,7 +141,6 @@ export async function pollGmailWebhooks() { const requestId = nanoid() try { - // Extract metadata const metadata = webhookData.providerConfig as any const credentialId: string | undefined = metadata?.credentialId const userId: string | undefined = metadata?.userId @@ -146,7 +152,6 @@ export async function pollGmailWebhooks() { return } - // Resolve owner and token let accessToken: string | null = null if (credentialId) { const rows = await db.select().from(account).where(eq(account.id, credentialId)).limit(1) @@ -161,7 +166,6 @@ export async function pollGmailWebhooks() { const ownerUserId = rows[0].userId accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId) } else if (userId) { - // Backward-compat fallback to workflow owner token accessToken = await getOAuthToken(userId, 'google-email') } @@ -174,18 +178,15 @@ export async function pollGmailWebhooks() { return } - // Get webhook configuration const config = webhookData.providerConfig as unknown as GmailWebhookConfig const now = new Date() - // Fetch new emails const fetchResult = await fetchNewEmails(accessToken, config, requestId) const { emails, latestHistoryId } = fetchResult if (!emails || !emails.length) { - // Update last checked timestamp await updateWebhookLastChecked( webhookId, now.toISOString(), @@ -201,10 +202,8 @@ export async function pollGmailWebhooks() { logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`) - // Process all emails (process each email as a separate workflow trigger) const emailsToProcess = emails - // Process emails const { processedCount, failedCount } = await processEmails( emailsToProcess, webhookData, @@ -213,19 +212,19 @@ export async function pollGmailWebhooks() { requestId ) - // Update webhook with latest history ID and timestamp - await updateWebhookData(webhookId, now.toISOString(), latestHistoryId || config.historyId) + await updateWebhookLastChecked( + webhookId, + now.toISOString(), + latestHistoryId || config.historyId + ) - // If all emails failed, mark webhook as failed. Otherwise mark as success. if (failedCount > 0 && processedCount === 0) { - // All emails failed to process - mark webhook as failed await markWebhookFailed(webhookId) failureCount++ logger.warn( `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` ) } else { - // At least some emails processed successfully await markWebhookSuccess(webhookId) successCount++ logger.info( @@ -233,7 +232,6 @@ export async function pollGmailWebhooks() { ) } } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error) await markWebhookFailed(webhookId) failureCount++ @@ -242,9 +240,7 @@ export async function pollGmailWebhooks() { for (const webhookData of activeWebhooks) { const promise = enqueue(webhookData) - .then(() => { - // Result processed, memory released - }) + .then(() => {}) .catch((err) => { logger.error('Unexpected error in webhook processing:', err) failureCount++ @@ -258,14 +254,13 @@ export async function pollGmailWebhooks() { } } - // Wait for remaining webhooks to complete await Promise.allSettled(running) const summary = { total: activeWebhooks.length, successful: successCount, failed: failureCount, - details: [], // Don't store details to save memory + details: [], } logger.info('Gmail polling completed', { @@ -284,13 +279,11 @@ export async function pollGmailWebhooks() { async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) { try { - // Determine whether to use history API or search const useHistoryApi = !!config.historyId let emails = [] let latestHistoryId = config.historyId if (useHistoryApi) { - // Use history API to get changes since last check const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}` const historyResponse = await fetch(historyUrl, { @@ -307,7 +300,6 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r error: errorData, }) - // Fall back to search if history API fails logger.info(`[${requestId}] Falling back to search API after history API failure`) const searchResult = await searchEmails(accessToken, config, requestId) return { @@ -322,12 +314,10 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r return { emails: [], latestHistoryId } } - // Update the latest history ID if (historyData.historyId) { latestHistoryId = historyData.historyId } - // Extract message IDs from history const messageIds = new Set() for (const history of historyData.history) { @@ -342,29 +332,28 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r return { emails: [], latestHistoryId } } - // Sort IDs by recency (reverse order) const sortedIds = [...messageIds].sort().reverse() - // Process all emails but respect the configured limit const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25) logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`) - // Fetch full email details for each message const emailPromises = idsToFetch.map(async (messageId) => { return getEmailDetails(accessToken, messageId) }) const emailResults = await Promise.allSettled(emailPromises) + const rejected = emailResults.filter((r) => r.status === 'rejected') + if (rejected.length > 0) { + logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`) + } emails = emailResults .filter( (result): result is PromiseFulfilledResult => result.status === 'fulfilled' ) .map((result) => result.value) - // Filter emails by labels if needed emails = filterEmailsByLabels(emails, config) } else { - // Use search if no history ID is available const searchResult = await searchEmails(accessToken, config, requestId) return searchResult } @@ -373,7 +362,7 @@ async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, r } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error fetching new emails:`, errorMessage) - return { emails: [], latestHistoryId: config.historyId } + throw error } } @@ -425,59 +414,40 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req const baseQuery = buildGmailSearchQuery(config) logger.debug(`[${requestId}] Gmail search query: ${baseQuery}`) - // Improved time-based filtering with dynamic buffer let timeConstraint = '' if (config.lastCheckedTimestamp) { - // Parse the last check time const lastCheckedTime = new Date(config.lastCheckedTimestamp) const now = new Date() - - // Calculate minutes since last check const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000) - // If last check was recent, use precise time-based query if (minutesSinceLastCheck < 60) { - // Less than an hour ago - // Calculate buffer in seconds - the greater of: - // 1. Twice the configured polling interval (or 2 minutes if not set) - // 2. At least 3 minutes (180 seconds) - const bufferSeconds = Math.max((config.pollingInterval || 2) * 60 * 2, 180) + const bufferSeconds = Math.max(1 * 60 * 2, 180) - // Calculate the cutoff time with buffer const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000) - // Format for Gmail's search syntax (seconds since epoch) const timestamp = Math.floor(cutoffTime.getTime() / 1000) timeConstraint = ` after:${timestamp}` logger.debug(`[${requestId}] Using timestamp-based query with ${bufferSeconds}s buffer`) - } - // If last check was a while ago, use Gmail's relative time queries - else if (minutesSinceLastCheck < 24 * 60) { - // Less than a day - // Use newer_than:Xh syntax for better reliability with longer intervals + } else if (minutesSinceLastCheck < 24 * 60) { const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 // Round up and add 1 hour buffer timeConstraint = ` newer_than:${hours}h` logger.debug(`[${requestId}] Using hour-based query: newer_than:${hours}h`) } else { - // For very old last checks, limit to a reasonable time period (7 days max) const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1 timeConstraint = ` newer_than:${days}d` logger.debug(`[${requestId}] Using day-based query: newer_than:${days}d`) } } else { - // If there's no last checked timestamp, default to recent emails (last 24h) timeConstraint = ' newer_than:1d' logger.debug(`[${requestId}] No last check time, using default: newer_than:1d`) } - // Combine base query and time constraints const query = `${baseQuery}${timeConstraint}` logger.info(`[${requestId}] Searching for emails with query: ${query}`) - // Search for emails with lower default const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}` const searchResponse = await fetch(searchUrl, { @@ -494,7 +464,9 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req query: query, error: errorData, }) - return { emails: [], latestHistoryId: config.historyId } + throw new Error( + `Gmail API error: ${searchResponse.status} ${searchResponse.statusText} - ${JSON.stringify(errorData)}` + ) } const searchData = await searchResponse.json() @@ -504,7 +476,6 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req return { emails: [], latestHistoryId: config.historyId } } - // Process emails within the limit const idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25) let latestHistoryId = config.historyId @@ -512,19 +483,21 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req `[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})` ) - // Fetch full email details for each message const emailPromises = idsToFetch.map(async (message: { id: string }) => { return getEmailDetails(accessToken, message.id) }) const emailResults = await Promise.allSettled(emailPromises) + const rejected = emailResults.filter((r) => r.status === 'rejected') + if (rejected.length > 0) { + logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`) + } const emails = emailResults .filter( (result): result is PromiseFulfilledResult => result.status === 'fulfilled' ) .map((result) => result.value) - // Get the latest history ID from the first email (most recent) if (emails.length > 0 && emails[0].historyId) { latestHistoryId = emails[0].historyId logger.debug(`[${requestId}] Updated historyId to ${latestHistoryId}`) @@ -534,7 +507,7 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error searching emails:`, errorMessage) - return { emails: [], latestHistoryId: config.historyId } + throw error } } @@ -586,12 +559,10 @@ async function processEmails( for (const email of emails) { try { - const result = await pollingIdempotency.executeWithIdempotency( + await pollingIdempotency.executeWithIdempotency( 'gmail', `${webhookData.id}:${email.id}`, async () => { - // Extract useful information from email to create a simplified payload - // First, extract headers into a map for easy access const headers: Record = {} if (email.payload?.headers) { for (const header of email.payload.headers) { @@ -599,22 +570,18 @@ async function processEmails( } } - // Extract and decode email body content let textContent = '' let htmlContent = '' - // Function to extract content from parts recursively const extractContent = (part: any) => { if (!part) return - // Extract current part content if it exists if (part.mimeType === 'text/plain' && part.body?.data) { textContent = Buffer.from(part.body.data, 'base64').toString('utf-8') } else if (part.mimeType === 'text/html' && part.body?.data) { htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8') } - // Process nested parts if (part.parts && Array.isArray(part.parts)) { for (const subPart of part.parts) { extractContent(subPart) @@ -622,12 +589,10 @@ async function processEmails( } } - // Extract content from the email payload if (email.payload) { extractContent(email.payload) } - // Parse date into standard format let date: string | null = null if (headers.date) { try { @@ -636,11 +601,9 @@ async function processEmails( // Keep date as null if parsing fails } } else if (email.internalDate) { - // Use internalDate as fallback (convert from timestamp to ISO string) date = new Date(Number.parseInt(email.internalDate)).toISOString() } - // Download attachments if requested (raw Buffers - will be uploaded during execution) let attachments: GmailAttachment[] = [] const hasAttachments = email.payload ? extractAttachmentInfo(email.payload).length > 0 @@ -655,11 +618,9 @@ async function processEmails( `[${requestId}] Error downloading attachments for email ${email.id}:`, error ) - // Continue without attachments rather than failing the entire request } } - // Create simplified email object const simplifiedEmail: SimplifiedEmail = { id: email.id, threadId: email.threadId, @@ -675,7 +636,6 @@ async function processEmails( attachments, } - // Prepare webhook payload with simplified email and optionally raw email const payload: GmailWebhookPayload = { email: simplifiedEmail, timestamp: new Date().toISOString(), @@ -686,7 +646,6 @@ async function processEmails( `[${requestId}] Sending ${config.includeRawEmail ? 'simplified + raw' : 'simplified'} email payload for ${email.id}` ) - // Trigger the webhook const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}` const response = await fetch(webhookUrl, { @@ -709,7 +668,6 @@ async function processEmails( throw new Error(`Webhook request failed: ${response.status} - ${errorText}`) } - // Mark email as read if configured if (config.markAsRead) { await markEmailAsRead(accessToken, email.id) } @@ -730,7 +688,6 @@ async function processEmails( const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage) failedCount++ - // Continue processing other emails even if one fails } } @@ -778,20 +735,3 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string, hi }) .where(eq(webhook.id, webhookId)) } - -async function updateWebhookData(webhookId: string, timestamp: string, historyId?: string) { - const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) - const existingConfig = (result[0]?.providerConfig as Record) || {} - - await db - .update(webhook) - .set({ - providerConfig: { - ...existingConfig, - lastCheckedTimestamp: timestamp, - ...(historyId ? { historyId } : {}), - } as any, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) -} diff --git a/apps/sim/lib/webhooks/outlook-polling-service.ts b/apps/sim/lib/webhooks/outlook-polling-service.ts index c679562f9..0ae291a69 100644 --- a/apps/sim/lib/webhooks/outlook-polling-service.ts +++ b/apps/sim/lib/webhooks/outlook-polling-service.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { account, webhook } from '@sim/db/schema' +import { account, webhook, workflow } from '@sim/db/schema' import { and, eq, sql } from 'drizzle-orm' import { htmlToText } from 'html-to-text' import { nanoid } from 'nanoid' @@ -66,7 +66,6 @@ interface OutlookWebhookConfig { markAsRead?: boolean maxEmailsPerPoll?: number lastCheckedTimestamp?: string - pollingInterval?: number includeAttachments?: boolean includeRawEmail?: boolean } @@ -126,15 +125,14 @@ export interface SimplifiedOutlookEmail { attachments: OutlookAttachment[] isRead: boolean folderId: string - // Thread support fields - messageId: string // Same as id, but explicit for threading - threadId: string // Same as conversationId, but explicit for threading + messageId: string + threadId: string } export interface OutlookWebhookPayload { email: SimplifiedOutlookEmail timestamp: string - rawEmail?: OutlookEmail // Only included when includeRawEmail is true + rawEmail?: OutlookEmail } /** @@ -159,11 +157,19 @@ export async function pollOutlookWebhooks() { logger.info('Starting Outlook webhook polling') try { - // Get all active Outlook webhooks - const activeWebhooks = await db - .select() + const activeWebhooksResult = await db + .select({ webhook }) .from(webhook) - .where(and(eq(webhook.provider, 'outlook'), eq(webhook.isActive, true))) + .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) + .where( + and( + eq(webhook.provider, 'outlook'), + eq(webhook.isActive, true), + eq(workflow.isDeployed, true) + ) + ) + + const activeWebhooks = activeWebhooksResult.map((r) => r.webhook) if (!activeWebhooks.length) { logger.info('No active Outlook webhooks found') @@ -172,7 +178,6 @@ export async function pollOutlookWebhooks() { logger.info(`Found ${activeWebhooks.length} active Outlook webhooks`) - // Limit concurrency to avoid exhausting connections const CONCURRENCY = 10 const running: Promise[] = [] let successCount = 0 @@ -185,7 +190,6 @@ export async function pollOutlookWebhooks() { try { logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`) - // Extract metadata const metadata = webhookData.providerConfig as any const credentialId: string | undefined = metadata?.credentialId const userId: string | undefined = metadata?.userId @@ -197,7 +201,6 @@ export async function pollOutlookWebhooks() { return } - // Resolve access token let accessToken: string | null = null if (credentialId) { const rows = await db.select().from(account).where(eq(account.id, credentialId)).limit(1) @@ -212,7 +215,6 @@ export async function pollOutlookWebhooks() { const ownerUserId = rows[0].userId accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId) } else if (userId) { - // Backward-compat fallback to workflow owner token accessToken = await getOAuthToken(userId, 'outlook') } @@ -225,17 +227,14 @@ export async function pollOutlookWebhooks() { return } - // Get webhook configuration const config = webhookData.providerConfig as unknown as OutlookWebhookConfig const now = new Date() - // Fetch new emails const fetchResult = await fetchNewOutlookEmails(accessToken, config, requestId) const { emails } = fetchResult if (!emails || !emails.length) { - // Update last checked timestamp await updateWebhookLastChecked(webhookId, now.toISOString()) await markWebhookSuccess(webhookId) logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) @@ -247,7 +246,6 @@ export async function pollOutlookWebhooks() { logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`) - // Process emails const { processedCount, failedCount } = await processOutlookEmails( emails, webhookData, @@ -256,19 +254,15 @@ export async function pollOutlookWebhooks() { requestId ) - // Update webhook with latest timestamp await updateWebhookLastChecked(webhookId, now.toISOString()) - // If all emails failed, mark webhook as failed. Otherwise mark as success. if (failedCount > 0 && processedCount === 0) { - // All emails failed to process - mark webhook as failed await markWebhookFailed(webhookId) failureCount++ logger.warn( `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` ) } else { - // At least some emails processed successfully await markWebhookSuccess(webhookId) successCount++ logger.info( @@ -276,7 +270,6 @@ export async function pollOutlookWebhooks() { ) } } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error) await markWebhookFailed(webhookId) failureCount++ @@ -285,9 +278,7 @@ export async function pollOutlookWebhooks() { for (const webhookData of activeWebhooks) { const promise = enqueue(webhookData) - .then(() => { - // Result processed, memory released - }) + .then(() => {}) .catch((err) => { logger.error('Unexpected error in webhook processing:', err) failureCount++ @@ -301,7 +292,6 @@ export async function pollOutlookWebhooks() { } } - // Wait for remaining webhooks to complete await Promise.allSettled(running) logger.info(`Outlook polling completed: ${successCount} successful, ${failureCount} failed`) @@ -310,7 +300,7 @@ export async function pollOutlookWebhooks() { total: activeWebhooks.length, successful: successCount, failed: failureCount, - details: [], // Don't store details to save memory + details: [], } } catch (error) { logger.error('Error during Outlook webhook polling:', error) @@ -324,27 +314,21 @@ async function fetchNewOutlookEmails( requestId: string ) { try { - // Build the Microsoft Graph API URL const apiUrl = 'https://graph.microsoft.com/v1.0/me/messages' const params = new URLSearchParams() - // Add select parameters to get the fields we need params.append( '$select', 'id,conversationId,subject,bodyPreview,body,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,hasAttachments,isRead,parentFolderId' ) - // Add ordering (newest first) params.append('$orderby', 'receivedDateTime desc') - // Limit results params.append('$top', (config.maxEmailsPerPoll || 25).toString()) - // Add time filter if we have a last checked timestamp if (config.lastCheckedTimestamp) { const lastChecked = new Date(config.lastCheckedTimestamp) - // Add a small buffer to avoid missing emails due to clock differences - const bufferTime = new Date(lastChecked.getTime() - 60000) // 1 minute buffer + const bufferTime = new Date(lastChecked.getTime() - 60000) params.append('$filter', `receivedDateTime gt ${bufferTime.toISOString()}`) } @@ -366,13 +350,14 @@ async function fetchNewOutlookEmails( statusText: response.statusText, error: errorData, }) - return { emails: [] } + throw new Error( + `Microsoft Graph API error: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}` + ) } const data = await response.json() const emails = data.value || [] - // Filter by folder if configured const filteredEmails = filterEmailsByFolder(emails, config) logger.info( @@ -383,7 +368,7 @@ async function fetchNewOutlookEmails( } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error fetching new Outlook emails:`, errorMessage) - return { emails: [] } + throw error } } @@ -401,9 +386,7 @@ function filterEmailsByFolder( emailFolderId.toLowerCase().includes(configFolder.toLowerCase()) ) - return config.folderFilterBehavior === 'INCLUDE' - ? hasMatchingFolder // Include emails from matching folders - : !hasMatchingFolder // Exclude emails from matching folders + return config.folderFilterBehavior === 'INCLUDE' ? hasMatchingFolder : !hasMatchingFolder }) } @@ -419,7 +402,7 @@ async function processOutlookEmails( for (const email of emails) { try { - const result = await pollingIdempotency.executeWithIdempotency( + await pollingIdempotency.executeWithIdempotency( 'outlook', `${webhookData.id}:${email.id}`, async () => { @@ -435,7 +418,6 @@ async function processOutlookEmails( } } - // Convert to simplified format const simplifiedEmail: SimplifiedOutlookEmail = { id: email.id, conversationId: email.conversationId, @@ -460,18 +442,15 @@ async function processOutlookEmails( attachments, isRead: email.isRead, folderId: email.parentFolderId, - // Thread support fields messageId: email.id, threadId: email.conversationId, } - // Create webhook payload const payload: OutlookWebhookPayload = { email: simplifiedEmail, timestamp: new Date().toISOString(), } - // Include raw email if configured if (config.includeRawEmail) { payload.rawEmail = email } @@ -480,7 +459,6 @@ async function processOutlookEmails( `[${requestId}] Processing email: ${email.subject} from ${email.from?.emailAddress?.address}` ) - // Trigger the webhook const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}` const response = await fetch(webhookUrl, { @@ -503,7 +481,6 @@ async function processOutlookEmails( throw new Error(`Webhook request failed: ${response.status} - ${errorText}`) } - // Mark email as read if configured if (config.markAsRead) { await markOutlookEmailAsRead(accessToken, email.id) } @@ -537,7 +514,6 @@ async function downloadOutlookAttachments( const attachments: OutlookAttachment[] = [] try { - // Fetch attachments list from Microsoft Graph API const response = await fetch( `https://graph.microsoft.com/v1.0/me/messages/${messageId}/attachments`, { @@ -558,11 +534,9 @@ async function downloadOutlookAttachments( for (const attachment of attachmentsList) { try { - // Microsoft Graph returns attachment data directly in the list response for file attachments if (attachment['@odata.type'] === '#microsoft.graph.fileAttachment') { const contentBytes = attachment.contentBytes if (contentBytes) { - // contentBytes is base64 encoded const buffer = Buffer.from(contentBytes, 'base64') attachments.push({ name: attachment.name, @@ -577,7 +551,6 @@ async function downloadOutlookAttachments( `[${requestId}] Error processing attachment ${attachment.id} for message ${messageId}:`, error ) - // Continue with other attachments } } @@ -618,7 +591,6 @@ async function markOutlookEmailAsRead(accessToken: string, messageId: string) { async function updateWebhookLastChecked(webhookId: string, timestamp: string) { try { - // Get current config first const currentWebhook = await db .select({ providerConfig: webhook.providerConfig }) .from(webhook) @@ -632,7 +604,7 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string) { const currentConfig = (currentWebhook[0].providerConfig as any) || {} const updatedConfig = { - ...currentConfig, // Preserve ALL existing config including userId + ...currentConfig, lastCheckedTimestamp: timestamp, }