From 04cd837e9ce23c25f3d7ff302d59c5b5636be3ca Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 18 Dec 2025 12:49:58 -0800 Subject: [PATCH] fix(notifs): inactivity polling filters, consolidate trigger types, minor consistency issue with filter parsing (#2452) * fix(notifs-slac): display name for account * fix inactivity polling check * consolidate trigger types * remove redundant defaults * fix --- apps/sim/app/api/auth/accounts/route.ts | 12 ++- .../app/api/workflows/[id]/execute/route.ts | 3 +- .../notifications/[notificationId]/route.ts | 3 +- .../workspaces/[id]/notifications/route.ts | 5 +- .../slack-channel-selector.tsx | 2 + .../notifications/notifications.tsx | 11 +-- apps/sim/hooks/use-slack-accounts.ts | 1 + apps/sim/lib/logs/events.ts | 5 +- apps/sim/lib/logs/types.ts | 5 +- apps/sim/lib/notifications/alert-rules.ts | 74 +++++++------- .../lib/notifications/inactivity-polling.ts | 98 ++++++++++++++++--- 11 files changed, 159 insertions(+), 60 deletions(-) diff --git a/apps/sim/app/api/auth/accounts/route.ts b/apps/sim/app/api/auth/accounts/route.ts index 418a04c02..5739586c3 100644 --- a/apps/sim/app/api/auth/accounts/route.ts +++ b/apps/sim/app/api/auth/accounts/route.ts @@ -32,7 +32,17 @@ export async function GET(request: NextRequest) { .from(account) .where(and(...whereConditions)) - return NextResponse.json({ accounts }) + // Use the user's email as the display name (consistent with credential selector) + const userEmail = session.user.email + + const accountsWithDisplayName = accounts.map((acc) => ({ + id: acc.id, + accountId: acc.accountId, + providerId: acc.providerId, + displayName: userEmail || acc.providerId, + })) + + return NextResponse.json({ accounts: accountsWithDisplayName }) } catch (error) { logger.error('Failed to fetch accounts', { error }) return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 2b9cd8bea..050088787 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -11,6 +11,7 @@ import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { ALL_TRIGGER_TYPES } from '@/lib/logs/types' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' @@ -30,7 +31,7 @@ const logger = createLogger('WorkflowExecuteAPI') const ExecuteWorkflowSchema = z.object({ selectedOutputs: z.array(z.string()).optional().default([]), - triggerType: z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']).optional(), + triggerType: z.enum(ALL_TRIGGER_TYPES).optional(), stream: z.boolean().optional(), useDraftState: z.boolean().optional(), input: z.any().optional(), diff --git a/apps/sim/app/api/workspaces/[id]/notifications/[notificationId]/route.ts b/apps/sim/app/api/workspaces/[id]/notifications/[notificationId]/route.ts index a7bca617d..799d148a6 100644 --- a/apps/sim/app/api/workspaces/[id]/notifications/[notificationId]/route.ts +++ b/apps/sim/app/api/workspaces/[id]/notifications/[notificationId]/route.ts @@ -6,13 +6,14 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { encryptSecret } from '@/lib/core/security/encryption' import { createLogger } from '@/lib/logs/console/logger' +import { ALL_TRIGGER_TYPES } from '@/lib/logs/types' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' import { MAX_EMAIL_RECIPIENTS, MAX_WORKFLOW_IDS } from '../constants' const logger = createLogger('WorkspaceNotificationAPI') const levelFilterSchema = z.array(z.enum(['info', 'error'])) -const triggerFilterSchema = z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat'])) +const triggerFilterSchema = z.array(z.enum(ALL_TRIGGER_TYPES)) const alertRuleSchema = z.enum([ 'consecutive_failures', diff --git a/apps/sim/app/api/workspaces/[id]/notifications/route.ts b/apps/sim/app/api/workspaces/[id]/notifications/route.ts index 9eb99ed5f..b1aa69ae0 100644 --- a/apps/sim/app/api/workspaces/[id]/notifications/route.ts +++ b/apps/sim/app/api/workspaces/[id]/notifications/route.ts @@ -7,6 +7,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { encryptSecret } from '@/lib/core/security/encryption' import { createLogger } from '@/lib/logs/console/logger' +import { ALL_TRIGGER_TYPES } from '@/lib/logs/types' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' import { MAX_EMAIL_RECIPIENTS, MAX_NOTIFICATIONS_PER_TYPE, MAX_WORKFLOW_IDS } from './constants' @@ -14,7 +15,7 @@ const logger = createLogger('WorkspaceNotificationsAPI') const notificationTypeSchema = z.enum(['webhook', 'email', 'slack']) const levelFilterSchema = z.array(z.enum(['info', 'error'])) -const triggerFilterSchema = z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat'])) +const triggerFilterSchema = z.array(z.enum(ALL_TRIGGER_TYPES)) const alertRuleSchema = z.enum([ 'consecutive_failures', @@ -80,7 +81,7 @@ const createNotificationSchema = z workflowIds: z.array(z.string()).max(MAX_WORKFLOW_IDS).default([]), allWorkflows: z.boolean().default(false), levelFilter: levelFilterSchema.default(['info', 'error']), - triggerFilter: triggerFilterSchema.default(['api', 'webhook', 'schedule', 'manual', 'chat']), + triggerFilter: triggerFilterSchema.default([...ALL_TRIGGER_TYPES]), includeFinalOutput: z.boolean().default(false), includeTraceSpans: z.boolean().default(false), includeRateLimits: z.boolean().default(false), diff --git a/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/components/slack-channel-selector/slack-channel-selector.tsx b/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/components/slack-channel-selector/slack-channel-selector.tsx index 67583aa34..037064199 100644 --- a/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/components/slack-channel-selector/slack-channel-selector.tsx +++ b/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/components/slack-channel-selector/slack-channel-selector.tsx @@ -104,6 +104,8 @@ export function SlackChannelSelector({ disabled={disabled || channels.length === 0} isLoading={isLoading} error={fetchError} + searchable + searchPlaceholder='Search channels...' /> {selectedChannel && !fetchError && (

diff --git a/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/notifications.tsx b/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/notifications.tsx index a112df9e6..81bedb039 100644 --- a/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/notifications.tsx +++ b/apps/sim/app/workspace/[workspaceId]/logs/components/logs-toolbar/components/notifications/notifications.tsx @@ -22,6 +22,7 @@ import { SlackIcon } from '@/components/icons' import { Skeleton } from '@/components/ui' import { cn } from '@/lib/core/utils/cn' import { createLogger } from '@/lib/logs/console/logger' +import { ALL_TRIGGER_TYPES, type TriggerType } from '@/lib/logs/types' import { quickValidateEmail } from '@/lib/messaging/email/validation' import { type NotificationSubscription, @@ -43,7 +44,6 @@ const PRIMARY_BUTTON_STYLES = type NotificationType = 'webhook' | 'email' | 'slack' type LogLevel = 'info' | 'error' -type TriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat' type AlertRule = | 'none' | 'consecutive_failures' @@ -84,7 +84,6 @@ interface NotificationSettingsProps { } const LOG_LEVELS: LogLevel[] = ['info', 'error'] -const TRIGGER_TYPES: TriggerType[] = ['api', 'webhook', 'schedule', 'manual', 'chat'] function formatAlertConfigLabel(config: { rule: AlertRule @@ -137,7 +136,7 @@ export function NotificationSettings({ workflowIds: [] as string[], allWorkflows: true, levelFilter: ['info', 'error'] as LogLevel[], - triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'] as TriggerType[], + triggerFilter: [...ALL_TRIGGER_TYPES] as TriggerType[], includeFinalOutput: false, includeTraceSpans: false, includeRateLimits: false, @@ -207,7 +206,7 @@ export function NotificationSettings({ workflowIds: [], allWorkflows: true, levelFilter: ['info', 'error'], - triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'], + triggerFilter: [...ALL_TRIGGER_TYPES], includeFinalOutput: false, includeTraceSpans: false, includeRateLimits: false, @@ -768,7 +767,7 @@ export function NotificationSettings({ ({ value: acc.id, - label: acc.accountId, + label: acc.displayName || 'Slack Workspace', }))} value={formData.slackAccountId} onChange={(value) => { @@ -859,7 +858,7 @@ export function NotificationSettings({

({ + options={ALL_TRIGGER_TYPES.map((trigger) => ({ label: trigger.charAt(0).toUpperCase() + trigger.slice(1), value: trigger, }))} diff --git a/apps/sim/hooks/use-slack-accounts.ts b/apps/sim/hooks/use-slack-accounts.ts index 4bb82543c..2f690b844 100644 --- a/apps/sim/hooks/use-slack-accounts.ts +++ b/apps/sim/hooks/use-slack-accounts.ts @@ -4,6 +4,7 @@ interface SlackAccount { id: string accountId: string providerId: string + displayName?: string } interface UseSlackAccountsResult { diff --git a/apps/sim/lib/logs/events.ts b/apps/sim/lib/logs/events.ts index 14e2ceee8..4d2b923c1 100644 --- a/apps/sim/lib/logs/events.ts +++ b/apps/sim/lib/logs/events.ts @@ -81,8 +81,8 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): ) for (const subscription of subscriptions) { - const levelMatches = subscription.levelFilter?.includes(log.level) ?? true - const triggerMatches = subscription.triggerFilter?.includes(log.trigger) ?? true + const levelMatches = subscription.levelFilter.includes(log.level) + const triggerMatches = subscription.triggerFilter.includes(log.trigger) if (!levelMatches || !triggerMatches) { logger.debug(`Skipping subscription ${subscription.id} due to filter mismatch`) @@ -98,6 +98,7 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): status: log.level === 'error' ? 'error' : 'success', durationMs: log.totalDurationMs || 0, cost: (log.cost as { total?: number })?.total || 0, + triggerFilter: subscription.triggerFilter, } const shouldAlert = await shouldTriggerAlert(alertConfig, context, subscription.lastAlertAt) diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index 6eebee216..6ea4f1b8e 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -51,8 +51,11 @@ export interface ExecutionEnvironment { workspaceId: string } +export const ALL_TRIGGER_TYPES = ['api', 'webhook', 'schedule', 'manual', 'chat'] as const +export type TriggerType = (typeof ALL_TRIGGER_TYPES)[number] + export interface ExecutionTrigger { - type: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat' | string + type: TriggerType | string source: string data?: Record timestamp: string diff --git a/apps/sim/lib/notifications/alert-rules.ts b/apps/sim/lib/notifications/alert-rules.ts index c07dc7e5e..d2263f381 100644 --- a/apps/sim/lib/notifications/alert-rules.ts +++ b/apps/sim/lib/notifications/alert-rules.ts @@ -1,6 +1,6 @@ import { db } from '@sim/db' import { workflowExecutionLogs } from '@sim/db/schema' -import { and, avg, count, desc, eq, gte } from 'drizzle-orm' +import { and, avg, count, desc, eq, gte, inArray } from 'drizzle-orm' import { createLogger } from '@/lib/logs/console/logger' const logger = createLogger('AlertRules') @@ -135,25 +135,29 @@ export function isInCooldown(lastAlertAt: Date | null): boolean { return new Date() < cooldownEnd } -/** - * Context passed to alert check functions - */ export interface AlertCheckContext { workflowId: string executionId: string status: 'success' | 'error' durationMs: number cost: number + triggerFilter: string[] } -/** - * Check if consecutive failures threshold is met - */ -async function checkConsecutiveFailures(workflowId: string, threshold: number): Promise { +async function checkConsecutiveFailures( + workflowId: string, + threshold: number, + triggerFilter: string[] +): Promise { const recentLogs = await db .select({ level: workflowExecutionLogs.level }) .from(workflowExecutionLogs) - .where(eq(workflowExecutionLogs.workflowId, workflowId)) + .where( + and( + eq(workflowExecutionLogs.workflowId, workflowId), + inArray(workflowExecutionLogs.trigger, triggerFilter) + ) + ) .orderBy(desc(workflowExecutionLogs.createdAt)) .limit(threshold) @@ -162,13 +166,11 @@ async function checkConsecutiveFailures(workflowId: string, threshold: number): return recentLogs.every((log) => log.level === 'error') } -/** - * Check if failure rate exceeds threshold - */ async function checkFailureRate( workflowId: string, ratePercent: number, - windowHours: number + windowHours: number, + triggerFilter: string[] ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) @@ -181,7 +183,8 @@ async function checkFailureRate( .where( and( eq(workflowExecutionLogs.workflowId, workflowId), - gte(workflowExecutionLogs.createdAt, windowStart) + gte(workflowExecutionLogs.createdAt, windowStart), + inArray(workflowExecutionLogs.trigger, triggerFilter) ) ) .orderBy(workflowExecutionLogs.createdAt) @@ -206,14 +209,12 @@ function checkLatencyThreshold(durationMs: number, thresholdMs: number): boolean return durationMs > thresholdMs } -/** - * Check if execution duration is significantly above average - */ async function checkLatencySpike( workflowId: string, currentDurationMs: number, spikePercent: number, - windowHours: number + windowHours: number, + triggerFilter: string[] ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) @@ -226,7 +227,8 @@ async function checkLatencySpike( .where( and( eq(workflowExecutionLogs.workflowId, workflowId), - gte(workflowExecutionLogs.createdAt, windowStart) + gte(workflowExecutionLogs.createdAt, windowStart), + inArray(workflowExecutionLogs.trigger, triggerFilter) ) ) @@ -248,13 +250,11 @@ function checkCostThreshold(cost: number, thresholdDollars: number): boolean { return cost > thresholdDollars } -/** - * Check if error count exceeds threshold within window - */ async function checkErrorCount( workflowId: string, threshold: number, - windowHours: number + windowHours: number, + triggerFilter: string[] ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) @@ -265,7 +265,8 @@ async function checkErrorCount( and( eq(workflowExecutionLogs.workflowId, workflowId), eq(workflowExecutionLogs.level, 'error'), - gte(workflowExecutionLogs.createdAt, windowStart) + gte(workflowExecutionLogs.createdAt, windowStart), + inArray(workflowExecutionLogs.trigger, triggerFilter) ) ) @@ -273,9 +274,6 @@ async function checkErrorCount( return errorCount >= threshold } -/** - * Evaluates if an alert should be triggered based on the configuration - */ export async function shouldTriggerAlert( config: AlertConfig, context: AlertCheckContext, @@ -287,16 +285,21 @@ export async function shouldTriggerAlert( } const { rule } = config - const { workflowId, status, durationMs, cost } = context + const { workflowId, status, durationMs, cost, triggerFilter } = context switch (rule) { case 'consecutive_failures': if (status !== 'error') return false - return checkConsecutiveFailures(workflowId, config.consecutiveFailures!) + return checkConsecutiveFailures(workflowId, config.consecutiveFailures!, triggerFilter) case 'failure_rate': if (status !== 'error') return false - return checkFailureRate(workflowId, config.failureRatePercent!, config.windowHours!) + return checkFailureRate( + workflowId, + config.failureRatePercent!, + config.windowHours!, + triggerFilter + ) case 'latency_threshold': return checkLatencyThreshold(durationMs, config.durationThresholdMs!) @@ -306,19 +309,24 @@ export async function shouldTriggerAlert( workflowId, durationMs, config.latencySpikePercent!, - config.windowHours! + config.windowHours!, + triggerFilter ) case 'cost_threshold': return checkCostThreshold(cost, config.costThresholdDollars!) case 'no_activity': - // no_activity alerts are handled by the hourly polling job, not execution events return false case 'error_count': if (status !== 'error') return false - return checkErrorCount(workflowId, config.errorCountThreshold!, config.windowHours!) + return checkErrorCount( + workflowId, + config.errorCountThreshold!, + config.windowHours!, + triggerFilter + ) default: logger.warn(`Unknown alert rule: ${rule}`) diff --git a/apps/sim/lib/notifications/inactivity-polling.ts b/apps/sim/lib/notifications/inactivity-polling.ts index f577b3666..c5d749897 100644 --- a/apps/sim/lib/notifications/inactivity-polling.ts +++ b/apps/sim/lib/notifications/inactivity-polling.ts @@ -1,6 +1,7 @@ import { db } from '@sim/db' import { workflow, + workflowDeploymentVersion, workflowExecutionLogs, workspaceNotificationDelivery, workspaceNotificationSubscription, @@ -9,15 +10,81 @@ import { and, eq, gte, inArray, sql } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { createLogger } from '@/lib/logs/console/logger' +import { TRIGGER_TYPES } from '@/lib/workflows/triggers/triggers' import { executeNotificationDelivery, workspaceNotificationDeliveryTask, } from '@/background/workspace-notification-delivery' +import type { WorkflowState } from '@/stores/workflows/workflow/types' import type { AlertConfig } from './alert-rules' import { isInCooldown } from './alert-rules' const logger = createLogger('InactivityPolling') +const SCHEDULE_BLOCK_TYPES: string[] = [TRIGGER_TYPES.SCHEDULE] +const WEBHOOK_BLOCK_TYPES: string[] = [TRIGGER_TYPES.WEBHOOK, TRIGGER_TYPES.GENERIC_WEBHOOK] + +function deploymentHasTriggerType( + deploymentState: Pick, + triggerFilter: string[] +): boolean { + const blocks = deploymentState.blocks + if (!blocks) return false + + const alwaysAvailable = ['api', 'manual', 'chat'] + if (triggerFilter.some((t) => alwaysAvailable.includes(t))) { + return true + } + + for (const block of Object.values(blocks)) { + if (triggerFilter.includes('schedule') && SCHEDULE_BLOCK_TYPES.includes(block.type)) { + return true + } + + if (triggerFilter.includes('webhook')) { + if (WEBHOOK_BLOCK_TYPES.includes(block.type)) { + return true + } + if (block.triggerMode === true) { + return true + } + } + } + + return false +} + +async function getWorkflowsWithTriggerTypes( + workspaceId: string, + triggerFilter: string[] +): Promise> { + const workflowIds = new Set() + + const deployedWorkflows = await db + .select({ + workflowId: workflow.id, + deploymentState: workflowDeploymentVersion.state, + }) + .from(workflow) + .innerJoin( + workflowDeploymentVersion, + and( + eq(workflowDeploymentVersion.workflowId, workflow.id), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .where(and(eq(workflow.workspaceId, workspaceId), eq(workflow.isDeployed, true))) + + for (const w of deployedWorkflows) { + const state = w.deploymentState as WorkflowState | null + if (state && deploymentHasTriggerType(state, triggerFilter)) { + workflowIds.add(w.workflowId) + } + } + + return workflowIds +} + interface InactivityCheckResult { subscriptionId: string workflowId: string @@ -25,9 +92,6 @@ interface InactivityCheckResult { reason?: string } -/** - * Checks a single workflow for inactivity and triggers notification if needed - */ async function checkWorkflowInactivity( subscription: typeof workspaceNotificationSubscription.$inferSelect, workflowId: string, @@ -141,9 +205,6 @@ async function checkWorkflowInactivity( return result } -/** - * Polls all active no_activity subscriptions and triggers alerts as needed - */ export async function pollInactivityAlerts(): Promise<{ total: number triggered: number @@ -179,19 +240,30 @@ export async function pollInactivityAlerts(): Promise<{ continue } + const triggerFilter = subscription.triggerFilter as string[] + if (!triggerFilter || triggerFilter.length === 0) { + logger.warn(`Subscription ${subscription.id} has no trigger filter, skipping`) + continue + } + + const eligibleWorkflowIds = await getWorkflowsWithTriggerTypes( + subscription.workspaceId, + triggerFilter + ) + let workflowIds: string[] = [] if (subscription.allWorkflows) { - const workflows = await db - .select({ id: workflow.id }) - .from(workflow) - .where(eq(workflow.workspaceId, subscription.workspaceId)) - - workflowIds = workflows.map((w) => w.id) + workflowIds = Array.from(eligibleWorkflowIds) } else { - workflowIds = subscription.workflowIds || [] + workflowIds = (subscription.workflowIds || []).filter((id) => eligibleWorkflowIds.has(id)) } + logger.debug(`Checking ${workflowIds.length} workflows for subscription ${subscription.id}`, { + triggerFilter, + eligibleCount: eligibleWorkflowIds.size, + }) + for (const workflowId of workflowIds) { const result = await checkWorkflowInactivity(subscription, workflowId, alertConfig) results.push(result)