fix(memory-util): fixed unbounded array of gmail/outlook pollers causing high memory util, added missing db indexes/removed unused ones, auto-disable schedules/webhooks after 10 consecutive failures (#2115)

* fix(memory-util): fixed unbounded array of gmail/outlook pollers causing high memory util, added missing db indexes/removed unused ones, auto-disable schedules/webhooks after 10 consecutive failures

* ack PR comments

* ack
This commit is contained in:
Waleed
2025-11-25 14:15:46 -08:00
committed by GitHub
parent d413bcdfb0
commit 7b7586d093
12 changed files with 8145 additions and 180 deletions

View File

@@ -5,6 +5,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger' import { createLogger } from '@/lib/logs/console/logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils' import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { validateInteger } from '@/lib/security/input-validation'
import { generateRequestId } from '@/lib/utils' import { generateRequestId } from '@/lib/utils'
const logger = createLogger('WebhookAPI') const logger = createLogger('WebhookAPI')
@@ -95,7 +96,15 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
} }
const body = await request.json() const body = await request.json()
const { path, provider, providerConfig, isActive } = body const { path, provider, providerConfig, isActive, failedCount } = body
if (failedCount !== undefined) {
const validation = validateInteger(failedCount, 'failedCount', { min: 0 })
if (!validation.isValid) {
logger.warn(`[${requestId}] ${validation.error}`)
return NextResponse.json({ error: validation.error }, { status: 400 })
}
}
let resolvedProviderConfig = providerConfig let resolvedProviderConfig = providerConfig
if (providerConfig) { if (providerConfig) {
@@ -172,6 +181,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
hasProviderUpdate: provider !== undefined, hasProviderUpdate: provider !== undefined,
hasConfigUpdate: providerConfig !== undefined, hasConfigUpdate: providerConfig !== undefined,
hasActiveUpdate: isActive !== undefined, hasActiveUpdate: isActive !== undefined,
hasFailedCountUpdate: failedCount !== undefined,
}) })
// Update the webhook // Update the webhook
@@ -185,6 +195,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
? resolvedProviderConfig ? resolvedProviderConfig
: webhooks[0].webhook.providerConfig, : webhooks[0].webhook.providerConfig,
isActive: isActive !== undefined ? isActive : webhooks[0].webhook.isActive, isActive: isActive !== undefined ? isActive : webhooks[0].webhook.isActive,
failedCount: failedCount !== undefined ? failedCount : webhooks[0].webhook.failedCount,
updatedAt: new Date(), updatedAt: new Date(),
}) })
.where(eq(webhook.id, id)) .where(eq(webhook.id, id))

View File

@@ -1,7 +1,10 @@
import { useCallback } from 'react' import { useCallback, useEffect, useState } from 'react'
import { createLogger } from '@/lib/logs/console/logger'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store'
const logger = createLogger('useWebhookInfo')
/** /**
* Return type for the useWebhookInfo hook * Return type for the useWebhookInfo hook
*/ */
@@ -12,16 +15,30 @@ export interface UseWebhookInfoReturn {
webhookProvider: string | undefined webhookProvider: string | undefined
/** The webhook path */ /** The webhook path */
webhookPath: string | undefined webhookPath: string | undefined
/** Whether the webhook is disabled */
isDisabled: boolean
/** The webhook ID if it exists in the database */
webhookId: string | undefined
/** Function to reactivate a disabled webhook */
reactivateWebhook: (webhookId: string) => Promise<void>
} }
/** /**
* Custom hook for managing webhook information for a block * Custom hook for managing webhook information for a block
* *
* @param blockId - The ID of the block * @param blockId - The ID of the block
* @param workflowId - The current workflow ID
* @returns Webhook configuration status and details * @returns Webhook configuration status and details
*/ */
export function useWebhookInfo(blockId: string): UseWebhookInfoReturn { export function useWebhookInfo(blockId: string, workflowId: string): UseWebhookInfoReturn {
const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId) const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId)
const [webhookStatus, setWebhookStatus] = useState<{
isDisabled: boolean
webhookId: string | undefined
}>({
isDisabled: false,
webhookId: undefined,
})
const isWebhookConfigured = useSubBlockStore( const isWebhookConfigured = useSubBlockStore(
useCallback( useCallback(
@@ -55,9 +72,82 @@ export function useWebhookInfo(blockId: string): UseWebhookInfoReturn {
) )
) )
const fetchWebhookStatus = useCallback(async () => {
if (!workflowId || !blockId || !isWebhookConfigured) {
setWebhookStatus({ isDisabled: false, webhookId: undefined })
return
}
try {
const params = new URLSearchParams({
workflowId,
blockId,
})
const response = await fetch(`/api/webhooks?${params}`, {
cache: 'no-store',
headers: { 'Cache-Control': 'no-cache' },
})
if (!response.ok) {
setWebhookStatus({ isDisabled: false, webhookId: undefined })
return
}
const data = await response.json()
const webhooks = data.webhooks || []
if (webhooks.length > 0) {
const webhook = webhooks[0].webhook
setWebhookStatus({
isDisabled: !webhook.isActive,
webhookId: webhook.id,
})
} else {
setWebhookStatus({ isDisabled: false, webhookId: undefined })
}
} catch (error) {
logger.error('Error fetching webhook status:', error)
setWebhookStatus({ isDisabled: false, webhookId: undefined })
}
}, [workflowId, blockId, isWebhookConfigured])
useEffect(() => {
fetchWebhookStatus()
}, [fetchWebhookStatus])
const reactivateWebhook = useCallback(
async (webhookId: string) => {
try {
const response = await fetch(`/api/webhooks/${webhookId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
isActive: true,
failedCount: 0,
}),
})
if (response.ok) {
await fetchWebhookStatus()
} else {
logger.error('Failed to reactivate webhook')
}
} catch (error) {
logger.error('Error reactivating webhook:', error)
}
},
[fetchWebhookStatus]
)
return { return {
isWebhookConfigured, isWebhookConfigured,
webhookProvider, webhookProvider,
webhookPath, webhookPath,
isDisabled: webhookStatus.isDisabled,
webhookId: webhookStatus.webhookId,
reactivateWebhook,
} }
} }

View File

@@ -139,7 +139,6 @@ const getDisplayValue = (value: unknown): string => {
const firstMessage = value[0] const firstMessage = value[0]
if (!firstMessage?.content || firstMessage.content.trim() === '') return '-' if (!firstMessage?.content || firstMessage.content.trim() === '') return '-'
const content = firstMessage.content.trim() const content = firstMessage.content.trim()
// Show first 50 characters of the first message content
return content.length > 50 ? `${content.slice(0, 50)}...` : content return content.length > 50 ? `${content.slice(0, 50)}...` : content
} }
@@ -326,7 +325,6 @@ const SubBlockRow = ({
? (workflowMap[rawValue]?.name ?? null) ? (workflowMap[rawValue]?.name ?? null)
: null : null
// Hydrate MCP server ID to name using TanStack Query
const { data: mcpServers = [] } = useMcpServers(workspaceId || '') const { data: mcpServers = [] } = useMcpServers(workspaceId || '')
const mcpServerDisplayName = useMemo(() => { const mcpServerDisplayName = useMemo(() => {
if (subBlock?.type !== 'mcp-server-selector' || typeof rawValue !== 'string') { if (subBlock?.type !== 'mcp-server-selector' || typeof rawValue !== 'string') {
@@ -362,7 +360,6 @@ const SubBlockRow = ({
const names = rawValue const names = rawValue
.map((a) => { .map((a) => {
// Prioritize ID lookup (source of truth) over stored name
if (a.variableId) { if (a.variableId) {
const variable = workflowVariables.find((v: any) => v.id === a.variableId) const variable = workflowVariables.find((v: any) => v.id === a.variableId)
return variable?.name return variable?.name
@@ -450,7 +447,14 @@ export const WorkflowBlock = memo(function WorkflowBlock({
currentWorkflow.blocks currentWorkflow.blocks
) )
const { isWebhookConfigured, webhookProvider, webhookPath } = useWebhookInfo(id) const {
isWebhookConfigured,
webhookProvider,
webhookPath,
isDisabled: isWebhookDisabled,
webhookId,
reactivateWebhook,
} = useWebhookInfo(id, currentWorkflowId)
const { const {
scheduleInfo, scheduleInfo,
@@ -746,7 +750,6 @@ export const WorkflowBlock = memo(function WorkflowBlock({
config.category !== 'triggers' && type !== 'starter' && !displayTriggerMode config.category !== 'triggers' && type !== 'starter' && !displayTriggerMode
const hasContentBelowHeader = subBlockRows.length > 0 || shouldShowDefaultHandles const hasContentBelowHeader = subBlockRows.length > 0 || shouldShowDefaultHandles
// Count rows based on block type and whether default handles section is shown
const defaultHandlesRow = shouldShowDefaultHandles ? 1 : 0 const defaultHandlesRow = shouldShowDefaultHandles ? 1 : 0
let rowsCount = 0 let rowsCount = 0
@@ -857,101 +860,62 @@ export const WorkflowBlock = memo(function WorkflowBlock({
</span> </span>
</div> </div>
<div className='relative z-10 flex flex-shrink-0 items-center gap-2'> <div className='relative z-10 flex flex-shrink-0 items-center gap-2'>
{isWorkflowSelector && childWorkflowId && ( {isWorkflowSelector &&
<> childWorkflowId &&
{typeof childIsDeployed === 'boolean' ? ( typeof childIsDeployed === 'boolean' &&
<Tooltip.Root> (!childIsDeployed || childNeedsRedeploy) && (
<Tooltip.Trigger asChild> <Tooltip.Root>
<Badge <Tooltip.Trigger asChild>
variant='outline' <Badge
className={!childIsDeployed || childNeedsRedeploy ? 'cursor-pointer' : ''} variant='outline'
style={{ className='cursor-pointer'
borderColor: !childIsDeployed style={{
? '#EF4444' borderColor: !childIsDeployed ? '#EF4444' : '#FF6600',
: childNeedsRedeploy color: !childIsDeployed ? '#EF4444' : '#FF6600',
? '#FF6600' }}
: '#22C55E', onClick={(e) => {
color: !childIsDeployed e.stopPropagation()
? '#EF4444' if (childWorkflowId && !isDeploying) {
: childNeedsRedeploy deployWorkflow(childWorkflowId)
? '#FF6600' }
: '#22C55E', }}
}} >
onClick={(e) => { {isDeploying ? 'Deploying...' : !childIsDeployed ? 'undeployed' : 'redeploy'}
e.stopPropagation() </Badge>
if ( </Tooltip.Trigger>
(!childIsDeployed || childNeedsRedeploy) && <Tooltip.Content>
childWorkflowId && <span className='text-sm'>
!isDeploying {!childIsDeployed ? 'Click to deploy' : 'Click to redeploy'}
) { </span>
deployWorkflow(childWorkflowId) </Tooltip.Content>
} </Tooltip.Root>
}} )}
>
{isDeploying
? 'Deploying...'
: !childIsDeployed
? 'undeployed'
: childNeedsRedeploy
? 'redeploy'
: 'deployed'}
</Badge>
</Tooltip.Trigger>
{(!childIsDeployed || childNeedsRedeploy) && (
<Tooltip.Content>
<span className='text-sm'>
{!childIsDeployed ? 'Click to deploy' : 'Click to redeploy'}
</span>
</Tooltip.Content>
)}
</Tooltip.Root>
) : (
<Badge variant='outline' style={{ visibility: 'hidden' }}>
deployed
</Badge>
)}
</>
)}
{!isEnabled && <Badge>disabled</Badge>} {!isEnabled && <Badge>disabled</Badge>}
{type === 'schedule' && ( {type === 'schedule' && shouldShowScheduleBadge && scheduleInfo?.isDisabled && (
<> <Tooltip.Root>
{shouldShowScheduleBadge ? ( <Tooltip.Trigger asChild>
<Tooltip.Root> <Badge
<Tooltip.Trigger asChild> variant='outline'
<Badge className='cursor-pointer'
variant='outline' style={{
className={scheduleInfo?.isDisabled ? 'cursor-pointer' : ''} borderColor: '#FF6600',
style={{ color: '#FF6600',
borderColor: scheduleInfo?.isDisabled ? '#FF6600' : '#22C55E', }}
color: scheduleInfo?.isDisabled ? '#FF6600' : '#22C55E', onClick={(e) => {
}} e.stopPropagation()
onClick={(e) => { if (scheduleInfo?.id) {
e.stopPropagation() reactivateSchedule(scheduleInfo.id)
if (scheduleInfo?.id) { }
if (scheduleInfo.isDisabled) { }}
reactivateSchedule(scheduleInfo.id) >
} else { disabled
disableSchedule(scheduleInfo.id)
}
}
}}
>
{scheduleInfo?.isDisabled ? 'disabled' : 'scheduled'}
</Badge>
</Tooltip.Trigger>
{scheduleInfo?.isDisabled && (
<Tooltip.Content>
<span className='text-sm'>Click to reactivate</span>
</Tooltip.Content>
)}
</Tooltip.Root>
) : (
<Badge variant='outline' style={{ visibility: 'hidden' }}>
scheduled
</Badge> </Badge>
)} </Tooltip.Trigger>
</> <Tooltip.Content>
<span className='text-sm'>Click to reactivate</span>
</Tooltip.Content>
</Tooltip.Root>
)} )}
{showWebhookIndicator && ( {showWebhookIndicator && (
@@ -982,6 +946,27 @@ export const WorkflowBlock = memo(function WorkflowBlock({
</Tooltip.Content> </Tooltip.Content>
</Tooltip.Root> </Tooltip.Root>
)} )}
{isWebhookConfigured && isWebhookDisabled && webhookId && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Badge
variant='outline'
className='cursor-pointer'
style={{ borderColor: '#FF6600', color: '#FF6600' }}
onClick={(e) => {
e.stopPropagation()
reactivateWebhook(webhookId)
}}
>
disabled
</Badge>
</Tooltip.Trigger>
<Tooltip.Content>
<span className='text-sm'>Click to reactivate</span>
</Tooltip.Content>
</Tooltip.Root>
)}
{/* {isActive && ( {/* {isActive && (
<div className='mr-[2px] ml-2 flex h-[16px] w-[16px] items-center justify-center'> <div className='mr-[2px] ml-2 flex h-[16px] w-[16px] items-center justify-center'>
<div <div

View File

@@ -23,7 +23,7 @@ import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerScheduleExecution') const logger = createLogger('TriggerScheduleExecution')
const MAX_CONSECUTIVE_FAILURES = 3 const MAX_CONSECUTIVE_FAILURES = 10
type WorkflowRecord = typeof workflow.$inferSelect type WorkflowRecord = typeof workflow.$inferSelect
type WorkflowScheduleUpdate = Partial<typeof workflowSchedule.$inferInsert> type WorkflowScheduleUpdate = Partial<typeof workflowSchedule.$inferInsert>

View File

@@ -59,7 +59,8 @@ async function initializeOpenTelemetry() {
const exporter = new OTLPTraceExporter({ const exporter = new OTLPTraceExporter({
url: telemetryConfig.endpoint, url: telemetryConfig.endpoint,
headers: {}, headers: {},
timeoutMillis: telemetryConfig.batchSettings.exportTimeoutMillis, timeoutMillis: Math.min(telemetryConfig.batchSettings.exportTimeoutMillis, 10000), // Max 10s
keepAlive: false,
}) })
const spanProcessor = new BatchSpanProcessor(exporter, { const spanProcessor = new BatchSpanProcessor(exporter, {

View File

@@ -295,6 +295,80 @@ export function validateNumericId(
return { isValid: true, sanitized: num.toString() } return { isValid: true, sanitized: num.toString() }
} }
/**
* Validates an integer value (from JSON body or other sources)
*
* This is stricter than validateNumericId - it requires:
* - Value must already be a number type (not string)
* - Must be an integer (no decimals)
* - Must be finite (not NaN or Infinity)
*
* @param value - The value to validate
* @param paramName - Name of the parameter for error messages
* @param options - Additional options (min, max)
* @returns ValidationResult
*
* @example
* ```typescript
* const result = validateInteger(failedCount, 'failedCount', { min: 0 })
* if (!result.isValid) {
* return NextResponse.json({ error: result.error }, { status: 400 })
* }
* ```
*/
export function validateInteger(
value: unknown,
paramName = 'value',
options: { min?: number; max?: number } = {}
): ValidationResult {
if (value === null || value === undefined) {
return {
isValid: false,
error: `${paramName} is required`,
}
}
if (typeof value !== 'number') {
logger.warn('Value is not a number', { paramName, valueType: typeof value })
return {
isValid: false,
error: `${paramName} must be a number`,
}
}
if (Number.isNaN(value) || !Number.isFinite(value)) {
logger.warn('Invalid number value', { paramName, value })
return {
isValid: false,
error: `${paramName} must be a valid number`,
}
}
if (!Number.isInteger(value)) {
logger.warn('Value is not an integer', { paramName, value })
return {
isValid: false,
error: `${paramName} must be an integer`,
}
}
if (options.min !== undefined && value < options.min) {
return {
isValid: false,
error: `${paramName} must be at least ${options.min}`,
}
}
if (options.max !== undefined && value > options.max) {
return {
isValid: false,
error: `${paramName} must be at most ${options.max}`,
}
}
return { isValid: true }
}
/** /**
* Validates that a value is in an allowed list (enum validation) * Validates that a value is in an allowed list (enum validation)
* *

View File

@@ -1,6 +1,6 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema' import { account, webhook } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm' import { and, eq, sql } from 'drizzle-orm'
import { nanoid } from 'nanoid' import { nanoid } from 'nanoid'
import { pollingIdempotency } from '@/lib/idempotency/service' import { pollingIdempotency } from '@/lib/idempotency/service'
import { createLogger } from '@/lib/logs/console/logger' import { createLogger } from '@/lib/logs/console/logger'
@@ -11,6 +11,8 @@ import { downloadAttachments, extractAttachmentInfo } from '@/tools/gmail/utils'
const logger = createLogger('GmailPollingService') const logger = createLogger('GmailPollingService')
const MAX_CONSECUTIVE_FAILURES = 10
interface GmailWebhookConfig { interface GmailWebhookConfig {
labelIds: string[] labelIds: string[]
labelFilterBehavior: 'INCLUDE' | 'EXCLUDE' labelFilterBehavior: 'INCLUDE' | 'EXCLUDE'
@@ -55,6 +57,53 @@ export interface GmailWebhookPayload {
rawEmail?: GmailEmail // Only included when includeRawEmail is true rawEmail?: GmailEmail // Only included when includeRawEmail is true
} }
async function markWebhookFailed(webhookId: string) {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
const newFailedCount = result[0]?.failedCount || 0
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
async function markWebhookSuccess(webhookId: string) {
try {
await db
.update(webhook)
.set({
failedCount: 0, // Reset on success
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
export async function pollGmailWebhooks() { export async function pollGmailWebhooks() {
logger.info('Starting Gmail webhook polling') logger.info('Starting Gmail webhook polling')
@@ -76,8 +125,9 @@ export async function pollGmailWebhooks() {
// exhausting Postgres or Gmail API connections when many users exist. // exhausting Postgres or Gmail API connections when many users exist.
const CONCURRENCY = 10 const CONCURRENCY = 10
const running: Promise<any>[] = [] const running: Promise<void>[] = []
const settledResults: PromiseSettledResult<any>[] = [] let successCount = 0
let failureCount = 0
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => { const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
const webhookId = webhookData.id const webhookId = webhookData.id
@@ -91,7 +141,9 @@ export async function pollGmailWebhooks() {
if (!credentialId && !userId) { if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`) logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`)
return { success: false, webhookId, error: 'Missing credentialId and userId' } await markWebhookFailed(webhookId)
failureCount++
return
} }
// Resolve owner and token // Resolve owner and token
@@ -102,7 +154,9 @@ export async function pollGmailWebhooks() {
logger.error( logger.error(
`[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}` `[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}`
) )
return { success: false, webhookId, error: 'Credential not found' } await markWebhookFailed(webhookId)
failureCount++
return
} }
const ownerUserId = rows[0].userId const ownerUserId = rows[0].userId
accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId) accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId)
@@ -115,7 +169,9 @@ export async function pollGmailWebhooks() {
logger.error( logger.error(
`[${requestId}] Failed to get Gmail access token for webhook ${webhookId} (cred or fallback)` `[${requestId}] Failed to get Gmail access token for webhook ${webhookId} (cred or fallback)`
) )
return { success: false, webhookId, error: 'No access token' } await markWebhookFailed(webhookId)
failureCount++
return
} }
// Get webhook configuration // Get webhook configuration
@@ -135,8 +191,10 @@ export async function pollGmailWebhooks() {
now.toISOString(), now.toISOString(),
latestHistoryId || config.historyId latestHistoryId || config.historyId
) )
await markWebhookSuccess(webhookId)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
return { success: true, webhookId, status: 'no_emails' } successCount++
return
} }
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
@@ -157,47 +215,46 @@ export async function pollGmailWebhooks() {
// Update webhook with latest history ID and timestamp // Update webhook with latest history ID and timestamp
await updateWebhookData(webhookId, now.toISOString(), latestHistoryId || config.historyId) await updateWebhookData(webhookId, now.toISOString(), latestHistoryId || config.historyId)
await markWebhookSuccess(webhookId)
successCount++
return { logger.info(
success: true, `[${requestId}] Successfully processed ${processed} emails for webhook ${webhookId}`
webhookId, )
emailsFound: emails.length,
emailsProcessed: processed,
}
} catch (error) { } catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error' const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error) logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error)
return { success: false, webhookId, error: errorMessage } await markWebhookFailed(webhookId)
failureCount++
} }
} }
for (const webhookData of activeWebhooks) { for (const webhookData of activeWebhooks) {
running.push(enqueue(webhookData)) const promise = enqueue(webhookData)
.then(() => {
// Result processed, memory released
})
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
running.push(promise)
if (running.length >= CONCURRENCY) { if (running.length >= CONCURRENCY) {
const result = await Promise.race(running) const completedIdx = await Promise.race(running.map((p, i) => p.then(() => i)))
running.splice(running.indexOf(result), 1) running.splice(completedIdx, 1)
settledResults.push(result)
} }
} }
while (running.length) { // Wait for remaining webhooks to complete
const result = await Promise.race(running) await Promise.allSettled(running)
running.splice(running.indexOf(result), 1)
settledResults.push(result)
}
const results = settledResults
const summary = { const summary = {
total: results.length, total: activeWebhooks.length,
successful: results.filter((r) => r.status === 'fulfilled' && r.value.success).length, successful: successCount,
failed: results.filter( failed: failureCount,
(r) => r.status === 'rejected' || (r.status === 'fulfilled' && !r.value.success) details: [], // Don't store details to save memory
).length,
details: results.map((r) =>
r.status === 'fulfilled' ? r.value : { success: false, error: r.reason }
),
} }
logger.info('Gmail polling completed', { logger.info('Gmail polling completed', {
@@ -694,8 +751,8 @@ async function markEmailAsRead(accessToken: string, messageId: string) {
} }
async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) { async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) {
const existingConfig = const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
(await db.select().from(webhook).where(eq(webhook.id, webhookId)))[0]?.providerConfig || {} const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db await db
.update(webhook) .update(webhook)
.set({ .set({
@@ -703,15 +760,15 @@ async function updateWebhookLastChecked(webhookId: string, timestamp: string, hi
...existingConfig, ...existingConfig,
lastCheckedTimestamp: timestamp, lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}), ...(historyId ? { historyId } : {}),
}, } as any,
updatedAt: new Date(), updatedAt: new Date(),
}) })
.where(eq(webhook.id, webhookId)) .where(eq(webhook.id, webhookId))
} }
async function updateWebhookData(webhookId: string, timestamp: string, historyId?: string) { async function updateWebhookData(webhookId: string, timestamp: string, historyId?: string) {
const existingConfig = const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
(await db.select().from(webhook).where(eq(webhook.id, webhookId)))[0]?.providerConfig || {} const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
await db await db
.update(webhook) .update(webhook)
@@ -720,7 +777,7 @@ async function updateWebhookData(webhookId: string, timestamp: string, historyId
...existingConfig, ...existingConfig,
lastCheckedTimestamp: timestamp, lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}), ...(historyId ? { historyId } : {}),
}, } as any,
updatedAt: new Date(), updatedAt: new Date(),
}) })
.where(eq(webhook.id, webhookId)) .where(eq(webhook.id, webhookId))

View File

@@ -1,6 +1,6 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema' import { account, webhook } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm' import { and, eq, sql } from 'drizzle-orm'
import { htmlToText } from 'html-to-text' import { htmlToText } from 'html-to-text'
import { nanoid } from 'nanoid' import { nanoid } from 'nanoid'
import { pollingIdempotency } from '@/lib/idempotency' import { pollingIdempotency } from '@/lib/idempotency'
@@ -10,6 +10,55 @@ import { getOAuthToken, refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/
const logger = createLogger('OutlookPollingService') const logger = createLogger('OutlookPollingService')
const MAX_CONSECUTIVE_FAILURES = 10
async function markWebhookFailed(webhookId: string) {
try {
const result = await db
.update(webhook)
.set({
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
lastFailedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
.returning({ failedCount: webhook.failedCount })
const newFailedCount = result[0]?.failedCount || 0
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
await db
.update(webhook)
.set({
isActive: false,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
logger.warn(
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
}
}
async function markWebhookSuccess(webhookId: string) {
try {
await db
.update(webhook)
.set({
failedCount: 0, // Reset on success
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
} catch (err) {
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
}
}
interface OutlookWebhookConfig { interface OutlookWebhookConfig {
credentialId: string credentialId: string
folderIds?: string[] // e.g., ['inbox', 'sent'] folderIds?: string[] // e.g., ['inbox', 'sent']
@@ -125,8 +174,9 @@ export async function pollOutlookWebhooks() {
// Limit concurrency to avoid exhausting connections // Limit concurrency to avoid exhausting connections
const CONCURRENCY = 10 const CONCURRENCY = 10
const running: Promise<any>[] = [] const running: Promise<void>[] = []
const results: any[] = [] let successCount = 0
let failureCount = 0
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => { const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
const webhookId = webhookData.id const webhookId = webhookData.id
@@ -135,14 +185,16 @@ export async function pollOutlookWebhooks() {
try { try {
logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`) logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`)
// Extract credentialId and/or userId // Extract metadata
const metadata = webhookData.providerConfig as any const metadata = webhookData.providerConfig as any
const credentialId: string | undefined = metadata?.credentialId const credentialId: string | undefined = metadata?.credentialId
const userId: string | undefined = metadata?.userId const userId: string | undefined = metadata?.userId
if (!credentialId && !userId) { if (!credentialId && !userId) {
logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`) logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`)
return { success: false, webhookId, error: 'Missing credentialId and userId' } await markWebhookFailed(webhookId)
failureCount++
return
} }
// Resolve access token // Resolve access token
@@ -153,7 +205,9 @@ export async function pollOutlookWebhooks() {
logger.error( logger.error(
`[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}` `[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}`
) )
return { success: false, webhookId, error: 'Credential not found' } await markWebhookFailed(webhookId)
failureCount++
return
} }
const ownerUserId = rows[0].userId const ownerUserId = rows[0].userId
accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId) accessToken = await refreshAccessTokenIfNeeded(credentialId, ownerUserId, requestId)
@@ -166,7 +220,9 @@ export async function pollOutlookWebhooks() {
logger.error( logger.error(
`[${requestId}] Failed to get Outlook access token for webhook ${webhookId} (cred or fallback)` `[${requestId}] Failed to get Outlook access token for webhook ${webhookId} (cred or fallback)`
) )
return { success: false, webhookId, error: 'No access token' } await markWebhookFailed(webhookId)
failureCount++
return
} }
// Get webhook configuration // Get webhook configuration
@@ -181,8 +237,10 @@ export async function pollOutlookWebhooks() {
if (!emails || !emails.length) { if (!emails || !emails.length) {
// Update last checked timestamp // Update last checked timestamp
await updateWebhookLastChecked(webhookId, now.toISOString()) await updateWebhookLastChecked(webhookId, now.toISOString())
await markWebhookSuccess(webhookId)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
return { success: true, webhookId, status: 'no_emails' } successCount++
return
} }
logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`) logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`)
@@ -200,47 +258,48 @@ export async function pollOutlookWebhooks() {
// Update webhook with latest timestamp // Update webhook with latest timestamp
await updateWebhookLastChecked(webhookId, now.toISOString()) await updateWebhookLastChecked(webhookId, now.toISOString())
await markWebhookSuccess(webhookId)
successCount++
return { logger.info(
success: true, `[${requestId}] Successfully processed ${processed} emails for webhook ${webhookId}`
webhookId, )
emailsFound: emails.length,
emailsProcessed: processed,
}
} catch (error) { } catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error' const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error) logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error)
return { success: false, webhookId, error: errorMessage } await markWebhookFailed(webhookId)
failureCount++
} }
} }
for (const webhookData of activeWebhooks) { for (const webhookData of activeWebhooks) {
running.push(enqueue(webhookData)) const promise = enqueue(webhookData)
.then(() => {
// Result processed, memory released
})
.catch((err) => {
logger.error('Unexpected error in webhook processing:', err)
failureCount++
})
running.push(promise)
if (running.length >= CONCURRENCY) { if (running.length >= CONCURRENCY) {
const result = await Promise.race(running) const completedIdx = await Promise.race(running.map((p, i) => p.then(() => i)))
running.splice(running.indexOf(result), 1) running.splice(completedIdx, 1)
results.push(result)
} }
} }
while (running.length) { // Wait for remaining webhooks to complete
const result = await Promise.race(running) await Promise.allSettled(running)
running.splice(running.indexOf(result), 1)
results.push(result)
}
// Calculate summary logger.info(`Outlook polling completed: ${successCount} successful, ${failureCount} failed`)
const successful = results.filter((r) => r.success).length
const failed = results.filter((r) => !r.success).length
logger.info(`Outlook polling completed: ${successful} successful, ${failed} failed`)
return { return {
total: activeWebhooks.length, total: activeWebhooks.length,
successful, successful: successCount,
failed, failed: failureCount,
details: results, details: [], // Don't store details to save memory
} }
} catch (error) { } catch (error) {
logger.error('Error during Outlook webhook polling:', error) logger.error('Error during Outlook webhook polling:', error)

View File

@@ -0,0 +1,8 @@
DROP INDEX "doc_kb_uploaded_at_idx";--> statement-breakpoint
DROP INDEX "workflow_blocks_workflow_type_idx";--> statement-breakpoint
DROP INDEX "workflow_deployment_version_workflow_id_idx";--> statement-breakpoint
DROP INDEX "workflow_execution_logs_execution_id_idx";--> statement-breakpoint
ALTER TABLE "webhook" ADD COLUMN "failed_count" integer DEFAULT 0;--> statement-breakpoint
ALTER TABLE "webhook" ADD COLUMN "last_failed_at" timestamp;--> statement-breakpoint
CREATE INDEX "idx_account_on_account_id_provider_id" ON "account" USING btree ("account_id","provider_id");--> statement-breakpoint
CREATE INDEX "idx_webhook_on_workflow_id_block_id" ON "webhook" USING btree ("workflow_id","block_id");

File diff suppressed because it is too large Load Diff

View File

@@ -778,6 +778,13 @@
"when": 1763667488537, "when": 1763667488537,
"tag": "0111_solid_dreadnoughts", "tag": "0111_solid_dreadnoughts",
"breakpoints": true "breakpoints": true
},
{
"idx": 112,
"version": "7",
"when": 1764095386986,
"tag": "0112_tired_blink",
"breakpoints": true
} }
] ]
} }

View File

@@ -84,6 +84,10 @@ export const account = pgTable(
}, },
(table) => ({ (table) => ({
userIdIdx: index('account_user_id_idx').on(table.userId), userIdIdx: index('account_user_id_idx').on(table.userId),
accountProviderIdx: index('idx_account_on_account_id_provider_id').on(
table.accountId,
table.providerId
),
}) })
) )
@@ -188,7 +192,6 @@ export const workflowBlocks = pgTable(
}, },
(table) => ({ (table) => ({
workflowIdIdx: index('workflow_blocks_workflow_id_idx').on(table.workflowId), workflowIdIdx: index('workflow_blocks_workflow_id_idx').on(table.workflowId),
workflowTypeIdx: index('workflow_blocks_workflow_type_idx').on(table.workflowId, table.type),
}) })
) )
@@ -300,7 +303,6 @@ export const workflowExecutionLogs = pgTable(
}, },
(table) => ({ (table) => ({
workflowIdIdx: index('workflow_execution_logs_workflow_id_idx').on(table.workflowId), workflowIdIdx: index('workflow_execution_logs_workflow_id_idx').on(table.workflowId),
executionIdIdx: index('workflow_execution_logs_execution_id_idx').on(table.executionId),
stateSnapshotIdIdx: index('workflow_execution_logs_state_snapshot_id_idx').on( stateSnapshotIdIdx: index('workflow_execution_logs_state_snapshot_id_idx').on(
table.stateSnapshotId table.stateSnapshotId
), ),
@@ -476,6 +478,8 @@ export const webhook = pgTable(
provider: text('provider'), // e.g., "whatsapp", "github", etc. provider: text('provider'), // e.g., "whatsapp", "github", etc.
providerConfig: json('provider_config'), // Store provider-specific configuration providerConfig: json('provider_config'), // Store provider-specific configuration
isActive: boolean('is_active').notNull().default(true), isActive: boolean('is_active').notNull().default(true),
failedCount: integer('failed_count').default(0), // Track consecutive failures
lastFailedAt: timestamp('last_failed_at'), // When the webhook last failed
createdAt: timestamp('created_at').notNull().defaultNow(), createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(), updatedAt: timestamp('updated_at').notNull().defaultNow(),
}, },
@@ -483,6 +487,11 @@ export const webhook = pgTable(
return { return {
// Ensure webhook paths are unique // Ensure webhook paths are unique
pathIdx: uniqueIndex('path_idx').on(table.path), pathIdx: uniqueIndex('path_idx').on(table.path),
// Optimize queries for webhooks by workflow and block
workflowBlockIdx: index('idx_webhook_on_workflow_id_block_id').on(
table.workflowId,
table.blockId
),
} }
} }
) )
@@ -1024,12 +1033,10 @@ export const document = pgTable(
uploadedAt: timestamp('uploaded_at').notNull().defaultNow(), uploadedAt: timestamp('uploaded_at').notNull().defaultNow(),
}, },
(table) => ({ (table) => ({
// Primary access pattern - documents by knowledge base // Primary access pattern - filter by knowledge base
knowledgeBaseIdIdx: index('doc_kb_id_idx').on(table.knowledgeBaseId), knowledgeBaseIdIdx: index('doc_kb_id_idx').on(table.knowledgeBaseId),
// Search by filename (for search functionality) // Search by filename
filenameIdx: index('doc_filename_idx').on(table.filename), filenameIdx: index('doc_filename_idx').on(table.filename),
// Order by upload date (for listing documents)
kbUploadedAtIdx: index('doc_kb_uploaded_at_idx').on(table.knowledgeBaseId, table.uploadedAt),
// Processing status filtering // Processing status filtering
processingStatusIdx: index('doc_processing_status_idx').on( processingStatusIdx: index('doc_processing_status_idx').on(
table.knowledgeBaseId, table.knowledgeBaseId,
@@ -1458,7 +1465,6 @@ export const workflowDeploymentVersion = pgTable(
createdBy: text('created_by'), createdBy: text('created_by'),
}, },
(table) => ({ (table) => ({
workflowIdIdx: index('workflow_deployment_version_workflow_id_idx').on(table.workflowId),
workflowVersionUnique: uniqueIndex('workflow_deployment_version_workflow_version_unique').on( workflowVersionUnique: uniqueIndex('workflow_deployment_version_workflow_version_unique').on(
table.workflowId, table.workflowId,
table.version table.version