fix(notifs): inactivity polling filters, consolidate trigger types, minor consistency issue with filter parsing (#2452)

* fix(notifs-slac): display name for account

* fix inactivity polling check

* consolidate trigger types

* remove redundant defaults

* fix
This commit is contained in:
Vikhyath Mondreti
2025-12-18 12:49:58 -08:00
committed by GitHub
parent c23130a26e
commit 04cd837e9c
11 changed files with 159 additions and 60 deletions

View File

@@ -32,7 +32,17 @@ export async function GET(request: NextRequest) {
.from(account)
.where(and(...whereConditions))
return NextResponse.json({ accounts })
// Use the user's email as the display name (consistent with credential selector)
const userEmail = session.user.email
const accountsWithDisplayName = accounts.map((acc) => ({
id: acc.id,
accountId: acc.accountId,
providerId: acc.providerId,
displayName: userEmail || acc.providerId,
}))
return NextResponse.json({ accounts: accountsWithDisplayName })
} catch (error) {
logger.error('Failed to fetch accounts', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })

View File

@@ -11,6 +11,7 @@ import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { ALL_TRIGGER_TYPES } from '@/lib/logs/types'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
@@ -30,7 +31,7 @@ const logger = createLogger('WorkflowExecuteAPI')
const ExecuteWorkflowSchema = z.object({
selectedOutputs: z.array(z.string()).optional().default([]),
triggerType: z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']).optional(),
triggerType: z.enum(ALL_TRIGGER_TYPES).optional(),
stream: z.boolean().optional(),
useDraftState: z.boolean().optional(),
input: z.any().optional(),

View File

@@ -6,13 +6,14 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { encryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
import { ALL_TRIGGER_TYPES } from '@/lib/logs/types'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { MAX_EMAIL_RECIPIENTS, MAX_WORKFLOW_IDS } from '../constants'
const logger = createLogger('WorkspaceNotificationAPI')
const levelFilterSchema = z.array(z.enum(['info', 'error']))
const triggerFilterSchema = z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']))
const triggerFilterSchema = z.array(z.enum(ALL_TRIGGER_TYPES))
const alertRuleSchema = z.enum([
'consecutive_failures',

View File

@@ -7,6 +7,7 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { encryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
import { ALL_TRIGGER_TYPES } from '@/lib/logs/types'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { MAX_EMAIL_RECIPIENTS, MAX_NOTIFICATIONS_PER_TYPE, MAX_WORKFLOW_IDS } from './constants'
@@ -14,7 +15,7 @@ const logger = createLogger('WorkspaceNotificationsAPI')
const notificationTypeSchema = z.enum(['webhook', 'email', 'slack'])
const levelFilterSchema = z.array(z.enum(['info', 'error']))
const triggerFilterSchema = z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']))
const triggerFilterSchema = z.array(z.enum(ALL_TRIGGER_TYPES))
const alertRuleSchema = z.enum([
'consecutive_failures',
@@ -80,7 +81,7 @@ const createNotificationSchema = z
workflowIds: z.array(z.string()).max(MAX_WORKFLOW_IDS).default([]),
allWorkflows: z.boolean().default(false),
levelFilter: levelFilterSchema.default(['info', 'error']),
triggerFilter: triggerFilterSchema.default(['api', 'webhook', 'schedule', 'manual', 'chat']),
triggerFilter: triggerFilterSchema.default([...ALL_TRIGGER_TYPES]),
includeFinalOutput: z.boolean().default(false),
includeTraceSpans: z.boolean().default(false),
includeRateLimits: z.boolean().default(false),

View File

@@ -104,6 +104,8 @@ export function SlackChannelSelector({
disabled={disabled || channels.length === 0}
isLoading={isLoading}
error={fetchError}
searchable
searchPlaceholder='Search channels...'
/>
{selectedChannel && !fetchError && (
<p className='text-[12px] text-[var(--text-muted)]'>

View File

@@ -22,6 +22,7 @@ import { SlackIcon } from '@/components/icons'
import { Skeleton } from '@/components/ui'
import { cn } from '@/lib/core/utils/cn'
import { createLogger } from '@/lib/logs/console/logger'
import { ALL_TRIGGER_TYPES, type TriggerType } from '@/lib/logs/types'
import { quickValidateEmail } from '@/lib/messaging/email/validation'
import {
type NotificationSubscription,
@@ -43,7 +44,6 @@ const PRIMARY_BUTTON_STYLES =
type NotificationType = 'webhook' | 'email' | 'slack'
type LogLevel = 'info' | 'error'
type TriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
type AlertRule =
| 'none'
| 'consecutive_failures'
@@ -84,7 +84,6 @@ interface NotificationSettingsProps {
}
const LOG_LEVELS: LogLevel[] = ['info', 'error']
const TRIGGER_TYPES: TriggerType[] = ['api', 'webhook', 'schedule', 'manual', 'chat']
function formatAlertConfigLabel(config: {
rule: AlertRule
@@ -137,7 +136,7 @@ export function NotificationSettings({
workflowIds: [] as string[],
allWorkflows: true,
levelFilter: ['info', 'error'] as LogLevel[],
triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'] as TriggerType[],
triggerFilter: [...ALL_TRIGGER_TYPES] as TriggerType[],
includeFinalOutput: false,
includeTraceSpans: false,
includeRateLimits: false,
@@ -207,7 +206,7 @@ export function NotificationSettings({
workflowIds: [],
allWorkflows: true,
levelFilter: ['info', 'error'],
triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'],
triggerFilter: [...ALL_TRIGGER_TYPES],
includeFinalOutput: false,
includeTraceSpans: false,
includeRateLimits: false,
@@ -768,7 +767,7 @@ export function NotificationSettings({
<Combobox
options={slackAccounts.map((acc) => ({
value: acc.id,
label: acc.accountId,
label: acc.displayName || 'Slack Workspace',
}))}
value={formData.slackAccountId}
onChange={(value) => {
@@ -859,7 +858,7 @@ export function NotificationSettings({
<div className='flex flex-col gap-[8px]'>
<Label className='text-[var(--text-secondary)]'>Trigger Type Filters</Label>
<Combobox
options={TRIGGER_TYPES.map((trigger) => ({
options={ALL_TRIGGER_TYPES.map((trigger) => ({
label: trigger.charAt(0).toUpperCase() + trigger.slice(1),
value: trigger,
}))}

View File

@@ -4,6 +4,7 @@ interface SlackAccount {
id: string
accountId: string
providerId: string
displayName?: string
}
interface UseSlackAccountsResult {

View File

@@ -81,8 +81,8 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
)
for (const subscription of subscriptions) {
const levelMatches = subscription.levelFilter?.includes(log.level) ?? true
const triggerMatches = subscription.triggerFilter?.includes(log.trigger) ?? true
const levelMatches = subscription.levelFilter.includes(log.level)
const triggerMatches = subscription.triggerFilter.includes(log.trigger)
if (!levelMatches || !triggerMatches) {
logger.debug(`Skipping subscription ${subscription.id} due to filter mismatch`)
@@ -98,6 +98,7 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
status: log.level === 'error' ? 'error' : 'success',
durationMs: log.totalDurationMs || 0,
cost: (log.cost as { total?: number })?.total || 0,
triggerFilter: subscription.triggerFilter,
}
const shouldAlert = await shouldTriggerAlert(alertConfig, context, subscription.lastAlertAt)

View File

@@ -51,8 +51,11 @@ export interface ExecutionEnvironment {
workspaceId: string
}
export const ALL_TRIGGER_TYPES = ['api', 'webhook', 'schedule', 'manual', 'chat'] as const
export type TriggerType = (typeof ALL_TRIGGER_TYPES)[number]
export interface ExecutionTrigger {
type: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat' | string
type: TriggerType | string
source: string
data?: Record<string, unknown>
timestamp: string

View File

@@ -1,6 +1,6 @@
import { db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { and, avg, count, desc, eq, gte } from 'drizzle-orm'
import { and, avg, count, desc, eq, gte, inArray } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('AlertRules')
@@ -135,25 +135,29 @@ export function isInCooldown(lastAlertAt: Date | null): boolean {
return new Date() < cooldownEnd
}
/**
* Context passed to alert check functions
*/
export interface AlertCheckContext {
workflowId: string
executionId: string
status: 'success' | 'error'
durationMs: number
cost: number
triggerFilter: string[]
}
/**
* Check if consecutive failures threshold is met
*/
async function checkConsecutiveFailures(workflowId: string, threshold: number): Promise<boolean> {
async function checkConsecutiveFailures(
workflowId: string,
threshold: number,
triggerFilter: string[]
): Promise<boolean> {
const recentLogs = await db
.select({ level: workflowExecutionLogs.level })
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.workflowId, workflowId))
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
inArray(workflowExecutionLogs.trigger, triggerFilter)
)
)
.orderBy(desc(workflowExecutionLogs.createdAt))
.limit(threshold)
@@ -162,13 +166,11 @@ async function checkConsecutiveFailures(workflowId: string, threshold: number):
return recentLogs.every((log) => log.level === 'error')
}
/**
* Check if failure rate exceeds threshold
*/
async function checkFailureRate(
workflowId: string,
ratePercent: number,
windowHours: number
windowHours: number,
triggerFilter: string[]
): Promise<boolean> {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
@@ -181,7 +183,8 @@ async function checkFailureRate(
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
gte(workflowExecutionLogs.createdAt, windowStart)
gte(workflowExecutionLogs.createdAt, windowStart),
inArray(workflowExecutionLogs.trigger, triggerFilter)
)
)
.orderBy(workflowExecutionLogs.createdAt)
@@ -206,14 +209,12 @@ function checkLatencyThreshold(durationMs: number, thresholdMs: number): boolean
return durationMs > thresholdMs
}
/**
* Check if execution duration is significantly above average
*/
async function checkLatencySpike(
workflowId: string,
currentDurationMs: number,
spikePercent: number,
windowHours: number
windowHours: number,
triggerFilter: string[]
): Promise<boolean> {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
@@ -226,7 +227,8 @@ async function checkLatencySpike(
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
gte(workflowExecutionLogs.createdAt, windowStart)
gte(workflowExecutionLogs.createdAt, windowStart),
inArray(workflowExecutionLogs.trigger, triggerFilter)
)
)
@@ -248,13 +250,11 @@ function checkCostThreshold(cost: number, thresholdDollars: number): boolean {
return cost > thresholdDollars
}
/**
* Check if error count exceeds threshold within window
*/
async function checkErrorCount(
workflowId: string,
threshold: number,
windowHours: number
windowHours: number,
triggerFilter: string[]
): Promise<boolean> {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
@@ -265,7 +265,8 @@ async function checkErrorCount(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
eq(workflowExecutionLogs.level, 'error'),
gte(workflowExecutionLogs.createdAt, windowStart)
gte(workflowExecutionLogs.createdAt, windowStart),
inArray(workflowExecutionLogs.trigger, triggerFilter)
)
)
@@ -273,9 +274,6 @@ async function checkErrorCount(
return errorCount >= threshold
}
/**
* Evaluates if an alert should be triggered based on the configuration
*/
export async function shouldTriggerAlert(
config: AlertConfig,
context: AlertCheckContext,
@@ -287,16 +285,21 @@ export async function shouldTriggerAlert(
}
const { rule } = config
const { workflowId, status, durationMs, cost } = context
const { workflowId, status, durationMs, cost, triggerFilter } = context
switch (rule) {
case 'consecutive_failures':
if (status !== 'error') return false
return checkConsecutiveFailures(workflowId, config.consecutiveFailures!)
return checkConsecutiveFailures(workflowId, config.consecutiveFailures!, triggerFilter)
case 'failure_rate':
if (status !== 'error') return false
return checkFailureRate(workflowId, config.failureRatePercent!, config.windowHours!)
return checkFailureRate(
workflowId,
config.failureRatePercent!,
config.windowHours!,
triggerFilter
)
case 'latency_threshold':
return checkLatencyThreshold(durationMs, config.durationThresholdMs!)
@@ -306,19 +309,24 @@ export async function shouldTriggerAlert(
workflowId,
durationMs,
config.latencySpikePercent!,
config.windowHours!
config.windowHours!,
triggerFilter
)
case 'cost_threshold':
return checkCostThreshold(cost, config.costThresholdDollars!)
case 'no_activity':
// no_activity alerts are handled by the hourly polling job, not execution events
return false
case 'error_count':
if (status !== 'error') return false
return checkErrorCount(workflowId, config.errorCountThreshold!, config.windowHours!)
return checkErrorCount(
workflowId,
config.errorCountThreshold!,
config.windowHours!,
triggerFilter
)
default:
logger.warn(`Unknown alert rule: ${rule}`)

View File

@@ -1,6 +1,7 @@
import { db } from '@sim/db'
import {
workflow,
workflowDeploymentVersion,
workflowExecutionLogs,
workspaceNotificationDelivery,
workspaceNotificationSubscription,
@@ -9,15 +10,81 @@ import { and, eq, gte, inArray, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { createLogger } from '@/lib/logs/console/logger'
import { TRIGGER_TYPES } from '@/lib/workflows/triggers/triggers'
import {
executeNotificationDelivery,
workspaceNotificationDeliveryTask,
} from '@/background/workspace-notification-delivery'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import type { AlertConfig } from './alert-rules'
import { isInCooldown } from './alert-rules'
const logger = createLogger('InactivityPolling')
const SCHEDULE_BLOCK_TYPES: string[] = [TRIGGER_TYPES.SCHEDULE]
const WEBHOOK_BLOCK_TYPES: string[] = [TRIGGER_TYPES.WEBHOOK, TRIGGER_TYPES.GENERIC_WEBHOOK]
function deploymentHasTriggerType(
deploymentState: Pick<WorkflowState, 'blocks'>,
triggerFilter: string[]
): boolean {
const blocks = deploymentState.blocks
if (!blocks) return false
const alwaysAvailable = ['api', 'manual', 'chat']
if (triggerFilter.some((t) => alwaysAvailable.includes(t))) {
return true
}
for (const block of Object.values(blocks)) {
if (triggerFilter.includes('schedule') && SCHEDULE_BLOCK_TYPES.includes(block.type)) {
return true
}
if (triggerFilter.includes('webhook')) {
if (WEBHOOK_BLOCK_TYPES.includes(block.type)) {
return true
}
if (block.triggerMode === true) {
return true
}
}
}
return false
}
async function getWorkflowsWithTriggerTypes(
workspaceId: string,
triggerFilter: string[]
): Promise<Set<string>> {
const workflowIds = new Set<string>()
const deployedWorkflows = await db
.select({
workflowId: workflow.id,
deploymentState: workflowDeploymentVersion.state,
})
.from(workflow)
.innerJoin(
workflowDeploymentVersion,
and(
eq(workflowDeploymentVersion.workflowId, workflow.id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.where(and(eq(workflow.workspaceId, workspaceId), eq(workflow.isDeployed, true)))
for (const w of deployedWorkflows) {
const state = w.deploymentState as WorkflowState | null
if (state && deploymentHasTriggerType(state, triggerFilter)) {
workflowIds.add(w.workflowId)
}
}
return workflowIds
}
interface InactivityCheckResult {
subscriptionId: string
workflowId: string
@@ -25,9 +92,6 @@ interface InactivityCheckResult {
reason?: string
}
/**
* Checks a single workflow for inactivity and triggers notification if needed
*/
async function checkWorkflowInactivity(
subscription: typeof workspaceNotificationSubscription.$inferSelect,
workflowId: string,
@@ -141,9 +205,6 @@ async function checkWorkflowInactivity(
return result
}
/**
* Polls all active no_activity subscriptions and triggers alerts as needed
*/
export async function pollInactivityAlerts(): Promise<{
total: number
triggered: number
@@ -179,19 +240,30 @@ export async function pollInactivityAlerts(): Promise<{
continue
}
const triggerFilter = subscription.triggerFilter as string[]
if (!triggerFilter || triggerFilter.length === 0) {
logger.warn(`Subscription ${subscription.id} has no trigger filter, skipping`)
continue
}
const eligibleWorkflowIds = await getWorkflowsWithTriggerTypes(
subscription.workspaceId,
triggerFilter
)
let workflowIds: string[] = []
if (subscription.allWorkflows) {
const workflows = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.workspaceId, subscription.workspaceId))
workflowIds = workflows.map((w) => w.id)
workflowIds = Array.from(eligibleWorkflowIds)
} else {
workflowIds = subscription.workflowIds || []
workflowIds = (subscription.workflowIds || []).filter((id) => eligibleWorkflowIds.has(id))
}
logger.debug(`Checking ${workflowIds.length} workflows for subscription ${subscription.id}`, {
triggerFilter,
eligibleCount: eligibleWorkflowIds.size,
})
for (const workflowId of workflowIds) {
const result = await checkWorkflowInactivity(subscription, workflowId, alertConfig)
results.push(result)